Repository: ignite Updated Branches: refs/heads/ignite-5075 26a1bb6a5 -> e6ebae167
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e6ebae16 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e6ebae16 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e6ebae16 Branch: refs/heads/ignite-5075 Commit: e6ebae167b2e0f50b746830d84cc466b4a957488 Parents: 26a1bb6 Author: sboikov <[email protected]> Authored: Thu May 4 17:42:52 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu May 4 18:41:14 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 82 +++++------- .../cache/CacheGroupInfrastructure.java | 131 +++++++++++++++++-- .../processors/cache/ClusterCachesInfo.java | 4 + .../processors/cache/GridCacheContext.java | 4 + .../processors/cache/GridCacheProcessor.java | 25 ++-- .../processors/cache/GridCacheUtils.java | 4 +- .../cache/affinity/GridCacheAffinityImpl.java | 2 +- .../distributed/dht/GridDhtCacheAdapter.java | 30 ++--- .../dht/GridDhtPartitionTopology.java | 2 +- .../dht/GridDhtPartitionTopologyImpl.java | 107 ++++++++------- .../dht/preloader/GridDhtPreloader.java | 50 ------- .../continuous/CacheContinuousQueryHandler.java | 2 +- .../cache/transactions/IgniteTxHandler.java | 4 +- 13 files changed, 250 insertions(+), 197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index adaa1e1..bd80bf0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -288,7 +288,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap Map<Integer, Map<Integer, List<UUID>>> assignmentsChange = U.newHashMap(waitInfo.assignments.size()); for (Map.Entry<Integer, Map<Integer, List<ClusterNode>>> e : waitInfo.assignments.entrySet()) { - Integer cacheId = e.getKey(); + Integer grpId = e.getKey(); Map<Integer, List<ClusterNode>> assignment = e.getValue(); @@ -297,27 +297,26 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap for (Map.Entry<Integer, List<ClusterNode>> e0 : assignment.entrySet()) assignment0.put(e0.getKey(), toIds0(e0.getValue())); - assignmentsChange.put(cacheId, assignment0); + assignmentsChange.put(grpId, assignment0); } return new CacheAffinityChangeMessage(waitInfo.topVer, assignmentsChange, waitInfo.deploymentIds); } /** - * @param cctx Cache context. + * @param grp Cache group. */ - public void onCacheCreated(GridCacheContext cctx) { - final Integer cacheId = cctx.cacheId(); - - // TODO IGNITE-5075: move to group initialization? -// if (!caches.containsKey(cctx.cacheId())) { -// cctx.io().addHandler(cacheId, GridDhtAffinityAssignmentResponse.class, -// new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() { -// @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) { -// processAffinityAssignmentResponse(cacheId, nodeId, res); -// } -// }); -// } + void onCacheGroupCreated(CacheGroupInfrastructure grp) { + final Integer grpId = grp.groupId(); + + if (!grpHolders.containsKey(grp.groupId())) { + cctx.io().addHandler(grpId, GridDhtAffinityAssignmentResponse.class, + new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() { + @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) { + processAffinityAssignmentResponse(grpId, nodeId, res); + } + }); + } } /** @@ -380,7 +379,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap nearCfg = req.nearCacheConfiguration(); } else { - startCache = cctx.cacheContext(action.descriptor().cacheId()) == null && + startCache = cctx.cacheContext(cacheDesc.cacheId()) == null && CU.affinityNode(cctx.localNode(), req.startCacheConfiguration().getNodeFilter()); } @@ -694,16 +693,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param nodeId Node ID. * @param res Response. */ - private void processAffinityAssignmentResponse(Integer cacheId, UUID nodeId, GridDhtAffinityAssignmentResponse res) { + private void processAffinityAssignmentResponse(Integer grpId, UUID nodeId, GridDhtAffinityAssignmentResponse res) { if (log.isDebugEnabled()) log.debug("Processing affinity assignment response [node=" + nodeId + ", res=" + res + ']'); for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values()) { - if (fut.key().get1().equals(cacheId)) { + if (fut.key().get1().equals(grpId)) { fut.onResponse(nodeId, res); break; @@ -992,9 +991,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap for (int i = 0; i < fetchFuts.size(); i++) { GridDhtAssignmentFetchFuture fetchFut = fetchFuts.get(i); - Integer cacheId = fetchFut.key().get1(); + Integer grpId = fetchFut.key().get1(); - fetchAffinity(fut, cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut); + fetchAffinity(fut, cctx.cache().cacheGroup(grpId).affinity(), fetchFut); } } @@ -1053,11 +1052,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap boolean centralizedAff; if (lateAffAssign) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + grp.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); } centralizedAff = true; @@ -1242,13 +1241,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); if (!crd) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - boolean latePrimary = cacheCtx.rebalanceEnabled(); + boolean latePrimary = grp.rebalanceEnabled(); - initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary, affCache); + initAffinityOnNodeJoin(fut, grp.affinity(), null, latePrimary, affCache); } return null; @@ -1626,17 +1625,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap */ abstract boolean client(); + /** + * @return Group ID. + */ int groupId() { return aff.groupId(); } -// /** -// * @return Cache ID. -// */ -// int cacheId() { -// return aff.cacheId(); -// } - /** * @return Partitions number. */ @@ -1644,13 +1639,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap return aff.partitions(); } -// /** -// * @return Cache name. -// */ -// String name() { -// return aff.cacheName(); -// } - /** * @param fut Exchange future. * @return Cache topology. @@ -1689,16 +1677,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap return false; } -// /** {@inheritDoc} */ -// @Override public String name() { -// return cctx.name(); -// } -// -// /** {@inheritDoc} */ -// @Override public int cacheId() { -// return cctx.cacheId(); -// } - /** {@inheritDoc} */ @Override public GridDhtPartitionTopology topology(GridDhtPartitionsExchangeFuture fut) { return grp.topology(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java index 72a62ce..57e560f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java @@ -17,17 +17,29 @@ package org.apache.ignite.internal.processors.cache; +import java.util.UUID; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteFuture; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheMode.LOCAL; import static org.apache.ignite.cache.CacheRebalanceMode.NONE; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL; /** * @@ -37,7 +49,7 @@ public class CacheGroupInfrastructure { private GridAffinityAssignmentCache aff; /** */ - private final int id; + private final int grpId; /** */ private final CacheConfiguration ccfg; @@ -46,24 +58,37 @@ public class CacheGroupInfrastructure { private final GridCacheSharedContext ctx; /** */ - private GridDhtPartitionTopology top; + private final IgniteLogger log; + /** */ + private GridDhtPartitionTopologyImpl top; + + /** */ private AffinityTopologyVersion grpStartVer; + /** */ private AffinityTopologyVersion locStartVer; /** - * @param id Group ID. + * @param grpId Group ID. * @param ctx Context. * @param ccfg Cache configuration. */ - CacheGroupInfrastructure(int id, GridCacheSharedContext ctx, CacheConfiguration ccfg) { - assert id != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", grpName=" + ccfg.getGroupName() + ']'; + CacheGroupInfrastructure(GridCacheSharedContext ctx, + int grpId, + CacheConfiguration ccfg, + AffinityTopologyVersion grpStartVer, + AffinityTopologyVersion locStartVer) { + assert grpId != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", grpName=" + ccfg.getGroupName() + ']'; assert ccfg != null; - this.id = id; + this.grpId = grpId; this.ctx = ctx; this.ccfg = ccfg; + this.grpStartVer = grpStartVer; + this.locStartVer = locStartVer; + + log = ctx.kernalContext().log(getClass()); } public AffinityTopologyVersion groupStartVersion() { @@ -89,12 +114,12 @@ public class CacheGroupInfrastructure { return aff; } - @Nullable public String groupName() { + @Nullable public String name() { return ccfg.getGroupName(); } public int groupId() { - return id; + return grpId; } public boolean sharedGroup() { @@ -103,12 +128,92 @@ public class CacheGroupInfrastructure { public void start() throws IgniteCheckedException { aff = new GridAffinityAssignmentCache(ctx.kernalContext(), - groupName(), - id, + name(), + grpId, ccfg.getAffinity(), ccfg.getNodeFilter(), ccfg.getBackups(), ccfg.getCacheMode() == LOCAL); + + if (ccfg.getCacheMode() != LOCAL) { + GridCacheMapEntryFactory entryFactory = new GridCacheMapEntryFactory() { + @Override public GridCacheMapEntry create( + GridCacheContext ctx, + AffinityTopologyVersion topVer, + KeyCacheObject key, + int hash, + CacheObject val + ) { + return new GridDhtCacheEntry(ctx, topVer, key, hash, val); + } + }; + + top = new GridDhtPartitionTopologyImpl(ctx, entryFactory); + + if (!ctx.kernalContext().clientNode()) { + ctx.io().addHandler(groupId(), GridDhtAffinityAssignmentRequest.class, + new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentRequest>() { + @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentRequest msg) { + processAffinityAssignmentRequest(nodeId, msg); + } + }); + } + } + + ctx.affinity().onCacheGroupCreated(this); + } + + /** + * @param nodeId Node ID. + * @param req Request. + */ + private void processAffinityAssignmentRequest(final UUID nodeId, + final GridDhtAffinityAssignmentRequest req) { + if (log.isDebugEnabled()) + log.debug("Processing affinity assignment request [node=" + nodeId + ", req=" + req + ']'); + + IgniteInternalFuture<AffinityTopologyVersion> fut = aff.readyFuture(req.topologyVersion()); + + if (fut != null) { + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + processAffinityAssignmentRequest0(nodeId, req); + } + }); + } + else + processAffinityAssignmentRequest0(nodeId, req); + } + + /** + * @param nodeId Node ID. + * @param req Request. + */ + private void processAffinityAssignmentRequest0(UUID nodeId, final GridDhtAffinityAssignmentRequest req) { + AffinityTopologyVersion topVer = req.topologyVersion(); + + if (log.isDebugEnabled()) + log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer + + ", node=" + nodeId + ']'); + + AffinityAssignment assignment = aff.cachedAffinity(topVer); + + GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(grpId, + topVer, + assignment.assignment()); + + if (aff.centralizedAffinityFunction()) { + assert assignment.idealAssignment() != null; + + res.idealAffinityAssignment(assignment.idealAssignment()); + } + + try { + ctx.io().send(nodeId, res, AFFINITY_POOL); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send affinity assignment response to remote node [node=" + nodeId + ']', e); + } } /** @@ -136,9 +241,15 @@ public class CacheGroupInfrastructure { public void onReconnected() { // TODO IGNITE-5075. aff.onReconnected(); + + if (top != null) + top.onReconnected(); } public GridDhtPartitionTopology topology() { + if (top == null) + throw new IllegalStateException("Topology is not initialized: " + groupName()); + return top; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 060c933..f70ea8d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -927,6 +927,10 @@ class ClusterCachesInfo { topVer, caches); + CacheGroupDescriptor old = registeredCacheGrps.put(grpName, grpDesc); + + assert old == null : old; + ctx.discovery().addCacheGroup(grpDesc, startedCacheCfg.getNodeFilter(), startedCacheCfg.getCacheMode()); if (exchActions != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index bfd28cf..7a4ad33 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -393,6 +393,10 @@ public class GridCacheContext<K, V> implements Externalizable { itHolder = new CacheWeakQueryIteratorsHolder(log); } + public int groupId() { + return grp.groupId(); + } + /** * @return Cache group infrastructure. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 1745d16..3769274 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1725,10 +1725,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { AffinityTopologyVersion exchTopVer) throws IgniteCheckedException { prepareCacheStart( + cacheDesc.groupDescriptor(), cacheDesc.cacheConfiguration(), nearCfg, cacheDesc.cacheType(), - cacheDesc.groupDescriptor().groupId(), cacheDesc.deploymentId(), cacheDesc.startTopologyVersion(), exchTopVer, @@ -1748,10 +1748,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor desc = t.get1(); prepareCacheStart( + desc.groupDescriptor(), desc.cacheConfiguration(), t.get2(), desc.cacheType(), - desc.groupDescriptor().groupId(), desc.deploymentId(), desc.startTopologyVersion(), exchTopVer, @@ -1779,10 +1779,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (CU.affinityNode(ctx.discovery().localNode(), filter)) { prepareCacheStart( + desc.groupDescriptor(), desc.cacheConfiguration(), null, desc.cacheType(), - desc.groupDescriptor().groupId(), desc.deploymentId(), desc.startTopologyVersion(), exchTopVer, @@ -1806,10 +1806,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed. */ private void prepareCacheStart( + CacheGroupDescriptor grpDesc, CacheConfiguration startCfg, @Nullable NearCacheConfiguration reqNearCfg, CacheType cacheType, - int grpId, IgniteUuid deploymentId, AffinityTopologyVersion cacheStartTopVer, AffinityTopologyVersion exchTopVer, @@ -1831,10 +1831,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { } if (grp == null) - grp = startCacheGroup(startCfg, grpId); + grp = startCacheGroup(grpDesc, exchTopVer); } else - grp = startCacheGroup(startCfg, grpId); + grp = startCacheGroup(grpDesc, exchTopVer); CacheConfiguration ccfg = new CacheConfiguration(startCfg); @@ -1878,14 +1878,19 @@ public class GridCacheProcessor extends GridProcessorAdapter { onKernalStart(cache); } - private CacheGroupInfrastructure startCacheGroup(CacheConfiguration cfg0, int grpId) throws IgniteCheckedException { - CacheConfiguration ccfg = new CacheConfiguration(cfg0); + private CacheGroupInfrastructure startCacheGroup(CacheGroupDescriptor desc, AffinityTopologyVersion exchTopVer) + throws IgniteCheckedException { + CacheConfiguration ccfg = new CacheConfiguration(desc.config()); - CacheGroupInfrastructure grp = new CacheGroupInfrastructure(grpId, sharedCtx, ccfg); + CacheGroupInfrastructure grp = new CacheGroupInfrastructure(sharedCtx, + desc.groupId(), + ccfg, + desc.startTopologyVersion(), + exchTopVer); grp.start(); - CacheGroupInfrastructure old = cacheGrps.put(grpId, grp); + CacheGroupInfrastructure old = cacheGrps.put(desc.groupId(), grp); assert old == null; http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 2260a99..f695768 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -516,7 +516,7 @@ public class GridCacheUtils { * @return All nodes on which cache with the same name is started. */ public static Collection<ClusterNode> affinityNodes(final GridCacheContext ctx) { - return ctx.discovery().cacheAffinityNodes(ctx.cacheId(), AffinityTopologyVersion.NONE); + return ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), AffinityTopologyVersion.NONE); } /** @@ -527,7 +527,7 @@ public class GridCacheUtils { * @return Affinity nodes. */ public static Collection<ClusterNode> affinityNodes(GridCacheContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().cacheAffinityNodes(ctx.cacheId(), topOrder); + return ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), topOrder); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java index 41b3281..f6032fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java @@ -196,7 +196,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> { int nodesCnt; if (!cctx.isLocal()) - nodesCnt = cctx.discovery().cacheAffinityNodes(cctx.cacheId(), topVer).size(); + nodesCnt = cctx.discovery().cacheGroupAffinityNodes(cctx.groupId(), topVer).size(); else nodesCnt = 1; http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 7e6ae81..2ee6f83 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -96,9 +96,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** */ private static final long serialVersionUID = 0L; - /** Topology. */ - private GridDhtPartitionTopologyImpl top; - /** Preloader. */ protected GridCachePreloader preldr; @@ -174,13 +171,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } /** {@inheritDoc} */ - @Override protected void init() { - super.init(); - - top = new GridDhtPartitionTopologyImpl(ctx, entryFactory()); - } - - /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { super.start(); @@ -200,7 +190,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap // Clean up to help GC. preldr = null; - top = null; } /** {@inheritDoc} */ @@ -209,7 +198,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap ctx.affinity().onReconnected(); - top.onReconnected(); + // TODO IGNITE-5075. + //top.onReconnected(); if (preldr != null) preldr.onReconnected(); @@ -235,7 +225,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Override public void printMemoryStats() { super.printMemoryStats(); - top.printMemoryStats(1024); + ctx.group().topology().printMemoryStats(1024); } /** @@ -264,7 +254,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @return Partition topology. */ public GridDhtPartitionTopology topology() { - return top; + return ctx.group().topology(); } /** {@inheritDoc} */ @@ -302,6 +292,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap if (tup != null) throw new IgniteCheckedException("Nested multi-update locks are not supported"); + GridDhtPartitionTopology top = ctx.group().topology(); + top.readLock(); GridDhtTopologyFuture topFut; @@ -344,7 +336,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap if (tup == null) throw new IgniteCheckedException("Multi-update was not started or released twice."); - top.readLock(); + ctx.group().topology().readLock(); try { IgniteUuid lockId = tup.get1(); @@ -357,7 +349,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap multiFut.onDone(lockId); } finally { - top.readUnlock(); + ctx.group().topology().readUnlock(); } } @@ -518,7 +510,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap return; try { - GridDhtLocalPartition part = top.localPartition(ctx.affinity().partition(key), + GridDhtLocalPartition part = ctx.group().topology().localPartition(ctx.affinity().partition(key), AffinityTopologyVersion.NONE, true); // Reserve to make sure that partition does not get unloaded. @@ -1201,8 +1193,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap if (expVer.equals(curVer)) return false; - Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), expVer); - Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), curVer); + Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), expVer); + Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), curVer); if (!cacheNodes0.equals(cacheNodes1) || ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0) return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index f9fd852..cf12986 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -90,7 +90,7 @@ public interface GridDhtPartitionTopology { /** * @return Cache ID. */ - public int cacheId(); + public int groupId(); /** * Pre-initializes this topology. http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 6d45d6e..6634e98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -42,9 +42,11 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; import org.apache.ignite.internal.processors.cache.ClusterState; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; @@ -72,7 +74,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh * Partition topology. */ @GridToStringExclude -class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { +public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** If true, then check consistency. */ private static final boolean CONSISTENCY_CHECK = false; @@ -82,8 +84,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** */ private static final Long ZERO = 0L; - /** Context. */ - private final GridCacheContext<?, ?> cctx; + /** */ + private final GridCacheSharedContext ctx; + + /** */ + private final CacheGroupInfrastructure grp; /** Logger. */ private final IgniteLogger log; @@ -131,23 +136,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private volatile boolean treatAllPartAsLoc; /** - * @param cctx Context. + * @param ctx Context. * @param entryFactory Entry factory. */ - GridDhtPartitionTopologyImpl(GridCacheContext<?, ?> cctx, GridCacheMapEntryFactory entryFactory) { - assert cctx != null; + public GridDhtPartitionTopologyImpl(GridCacheSharedContext ctx, CacheGroupInfrastructure grp, GridCacheMapEntryFactory entryFactory) { + assert ctx != null; + assert grp != null; + assert entryFactory != null; - this.cctx = cctx; + this.ctx = ctx; + this.grp = grp; this.entryFactory = entryFactory; - log = cctx.logger(getClass()); + log = ctx.logger(getClass()); - locParts = new AtomicReferenceArray<>(cctx.config().getAffinity().partitions()); + locParts = new AtomicReferenceArray<>(grp.config().getAffinity().partitions()); } /** {@inheritDoc} */ - @Override public int cacheId() { - return cctx.cacheId(); + @Override public int groupId() { + return grp.groupId(); } /** @@ -171,7 +179,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { topVer = AffinityTopologyVersion.NONE; - discoCache = cctx.discovery().discoCache(); + discoCache = ctx.discovery().discoCache(); } finally { lock.writeLock().unlock(); @@ -235,13 +243,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (dumpCnt++ < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) { U.warn(log, "Failed to wait for partition eviction [" + "topVer=" + topVer + - ", cache=" + cctx.name() + + ", group=" + grp.name() + ", part=" + part.id() + ", partState=" + part.state() + ", size=" + part.size() + ", reservations=" + part.reservations() + ", grpReservations=" + part.groupReserved() + - ", node=" + cctx.localNodeId() + "]"); + ", node=" + ctx.localNodeId() + "]"); if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false)) U.dumpThreads(log); @@ -329,7 +337,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { AffinityTopologyVersion topVer = this.topVer; assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer + - ", cacheName=" + cctx.name() + ']'; + ", group=" + grp.name() + ']'; return topVer; } @@ -371,7 +379,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param updateSeq Update sequence. */ private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { - ClusterNode loc = cctx.localNode(); + ClusterNode loc = ctx.localNode(); ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); @@ -379,21 +387,21 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert topVer.equals(exchFut.topologyVersion()) : "Invalid topology [topVer=" + topVer + - ", cache=" + cctx.name() + + ", grp=" + grp.name() + ", futVer=" + exchFut.topologyVersion() + ", fut=" + exchFut + ']'; - assert cctx.affinity().affinityTopologyVersion().equals(exchFut.topologyVersion()) : - "Invalid affinity [topVer=" + cctx.affinity().affinityTopologyVersion() + - ", cache=" + cctx.name() + + assert grp.affinity().lastVersion().equals(exchFut.topologyVersion()) : + "Invalid affinity [topVer=" + grp.affinity().lastVersion() + + ", grp=" + grp.name() + ", futVer=" + exchFut.topologyVersion() + ", fut=" + exchFut + ']'; - List<List<ClusterNode>> aff = cctx.affinity().assignments(exchFut.topologyVersion()); + List<List<ClusterNode>> aff = grp.affinity().assignments(exchFut.topologyVersion()); - int num = cctx.affinity().partitions(); + int num = grp.affinity().partitions(); - if (cctx.rebalanceEnabled()) { - boolean added = exchId.topologyVersion().equals(cctx.cacheStartTopologyVersion()); + if (grp.rebalanceEnabled()) { + boolean added = exchId.topologyVersion().equals(grp.groupStartVersion()); boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added; @@ -406,7 +414,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { boolean owned = locPart.own(); - assert owned : "Failed to own partition for oldest node [cacheName" + cctx.name() + + assert owned : "Failed to own partition for oldest node [grp" + grp.name() + ", part=" + locPart + ']'; if (log.isDebugEnabled()) @@ -465,7 +473,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param updateSeq Update sequence. */ private void createPartitions(List<List<ClusterNode>> aff, long updateSeq) { - int num = cctx.affinity().partitions(); + int num = grp.affinity().partitions(); for (int p = 0; p < num; p++) { if (node2part != null && node2part.valid()) { @@ -493,20 +501,20 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { ClusterState newState = exchFut.newClusterState(); treatAllPartAsLoc = (newState != null && newState == ClusterState.ACTIVE) - || (cctx.kernalContext().state().active() + || (ctx.kernalContext().state().active() && discoEvt.type() == EventType.EVT_NODE_JOINED && discoEvt.eventNode().isLocal() - && !cctx.kernalContext().clientNode() + && !ctx.kernalContext().clientNode() ); // Wait for rent outside of checkpoint lock. waitForRent(); - ClusterNode loc = cctx.localNode(); + ClusterNode loc = ctx.localNode(); - cctx.shared().database().checkpointReadLock(); + ctx.database().checkpointReadLock(); - synchronized (cctx.shared().exchange().interruptLock()) { + synchronized (ctx.exchange().interruptLock()) { if (Thread.currentThread().isInterrupted()) throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread()); @@ -514,7 +522,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { U.writeLock(lock); } catch (IgniteInterruptedCheckedException e) { - cctx.shared().database().checkpointReadUnlock(); + ctx.database().checkpointReadUnlock(); throw e; } @@ -568,7 +576,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (affReady) initPartitions0(exchFut, updateSeq); else { - List<List<ClusterNode>> aff = cctx.affinity().idealAssignment(); + List<List<ClusterNode>> aff = grp.affinity().idealAssignment(); createPartitions(aff, updateSeq); } @@ -582,7 +590,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { finally { lock.writeLock().unlock(); - cctx.shared().database().checkpointReadUnlock(); + ctx.database().checkpointReadUnlock(); } } @@ -596,13 +604,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { boolean changed = waitForRent(); - int num = cctx.affinity().partitions(); + int num = grp.affinity().partitions(); AffinityTopologyVersion topVer = exchFut.topologyVersion(); - assert cctx.affinity().affinityTopologyVersion().equals(topVer) : "Affinity is not initialized " + + assert grp.affinity().lastVersion().equals(topVer) : "Affinity is not initialized " + "[topVer=" + topVer + - ", affVer=" + cctx.affinity().affinityTopologyVersion() + + ", affVer=" + grp.affinity().lastVersion() + ", fut=" + exchFut + ']'; lock.writeLock().lock(); @@ -623,7 +631,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { for (int p = 0; p < num; p++) { GridDhtLocalPartition locPart = localPartition(p, topVer, false, false); - if (cctx.affinity().partitionLocalNode(p, topVer)) { + if (grp.affinity().partitionLocalNode(p, topVer)) { // This partition will be created during next topology event, // which obviously has not happened at this point. if (locPart == null) { @@ -636,27 +644,28 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtPartitionState state = locPart.state(); if (state == MOVING) { - if (cctx.rebalanceEnabled()) { + if (grp.rebalanceEnabled()) { Collection<ClusterNode> owners = owners(p); // If there are no other owners, then become an owner. if (F.isEmpty(owners)) { boolean owned = locPart.own(); - assert owned : "Failed to own partition [cacheName" + cctx.name() + ", locPart=" + + assert owned : "Failed to own partition [grp=" + grp.name() + ", locPart=" + locPart + ']'; updateSeq = updateLocal(p, locPart.state(), updateSeq); changed = true; - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { - DiscoveryEvent discoEvt = exchFut.discoveryEvent(); - - cctx.events().addPreloadEvent(p, - EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), - discoEvt.type(), discoEvt.timestamp()); - } +// TODO IGNITE-5075. +// if (ctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { +// DiscoveryEvent discoEvt = exchFut.discoveryEvent(); +// +// cctx.events().addPreloadEvent(p, +// EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), +// discoEvt.type(), discoEvt.timestamp()); +// } if (log.isDebugEnabled()) log.debug("Owned partition: " + locPart); @@ -673,7 +682,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (locPart != null) { GridDhtPartitionState state = locPart.state(); - if (state == MOVING && cctx.kernalContext().state().active()) { + if (state == MOVING && ctx.kernalContext().state().active()) { locPart.rent(false); updateSeq = updateLocal(p, locPart.state(), updateSeq); @@ -687,7 +696,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - updateRebalanceVersion(cctx.affinity().assignments(topVer)); + updateRebalanceVersion(grp.affinity().assignments(topVer)); consistencyCheck(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 43e6af7..e197864 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -177,17 +177,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } }); - if (!cctx.kernalContext().clientNode()) { - cctx.io().addHandler(cctx.cacheId(), GridDhtAffinityAssignmentRequest.class, - new MessageHandler<GridDhtAffinityAssignmentRequest>() { - @Override protected void onMessage(ClusterNode node, GridDhtAffinityAssignmentRequest msg) { - processAffinityAssignmentRequest(node, msg); - } - }); - } - - cctx.shared().affinity().onCacheCreated(cctx); - supplier = new GridDhtPartitionSupplier(cctx); demander = new GridDhtPartitionDemander(cctx); @@ -590,45 +579,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** - * @param node Node. - * @param req Request. - */ - private void processAffinityAssignmentRequest(final ClusterNode node, - final GridDhtAffinityAssignmentRequest req) { - final AffinityTopologyVersion topVer = req.topologyVersion(); - - if (log.isDebugEnabled()) - log.debug("Processing affinity assignment request [node=" + node + ", req=" + req + ']'); - - cctx.affinity().affinityReadyFuture(req.topologyVersion()).listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { - if (log.isDebugEnabled()) - log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer + - ", node=" + node + ']'); - - AffinityAssignment assignment = cctx.affinity().assignment(topVer); - - GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(cctx.cacheId(), - topVer, - assignment.assignment()); - - if (cctx.affinity().affinityCache().centralizedAffinityFunction()) { - assert assignment.idealAssignment() != null; - - res.idealAffinityAssignment(assignment.idealAssignment()); - } - - try { - cctx.io().send(node, res, AFFINITY_POOL); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send affinity assignment response to remote node [node=" + node + ']', e); - } - } - }); - } - - /** * Resends partitions on partition evict within configured timeout. * * @param part Evicted partition. http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index efb02c6..6b6585b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -1327,7 +1327,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler t.get1()); for (AffinityTopologyVersion topVer : t.get2()) { - for (ClusterNode node : ctx.discovery().cacheAffinityNodes(cctx.name(), topVer)) { + for (ClusterNode node : ctx.discovery().cacheGroupAffinityNodes(cctx.groupId(), topVer)) { if (!node.isLocal()) { try { cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL); http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index a591517..c6dc114 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -506,8 +506,8 @@ public class IgniteTxHandler { for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) { GridCacheContext ctx = e.context(); - Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), expVer); - Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), curVer); + Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), expVer); + Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), curVer); if (!cacheNodes0.equals(cacheNodes1) || ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0) return true;
