http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 962f137..44bb04f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -72,7 +72,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private boolean lateAffAssign; /** Affinity information for all started caches (initialized on coordinator). */ - private ConcurrentMap<Integer, CacheHolder> caches = new ConcurrentHashMap<>(); + private ConcurrentMap<Integer, CacheGroupHolder> grpHolders = new ConcurrentHashMap<>(); /** Last topology version when affinity was calculated (updated from exchange thread). */ private AffinityTopologyVersion affCalcVer; @@ -81,7 +81,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private AffinityTopologyVersion lastAffVer; /** Registered caches (updated from exchange thread). */ - private final Map<Integer, DynamicCacheDescriptor> registeredCaches = new HashMap<>(); + private final Map<Integer, CacheGroupDescriptor> registeredGrps = new HashMap<>(); /** */ private WaitRebalanceInfo waitInfo; @@ -131,14 +131,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { if (type == EVT_NODE_JOINED && node.isLocal()) { // Clean-up in case of client reconnect. - registeredCaches.clear(); + registeredGrps.clear(); affCalcVer = null; lastAffVer = null; - for (DynamicCacheDescriptor desc : cctx.cache().cacheDescriptors()) - registeredCaches.put(desc.cacheId(), desc); + for (CacheGroupDescriptor desc : cctx.cache().cacheGroupDescriptors().values()) + registeredGrps.put(desc.groupId(), desc); } if (!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) { @@ -159,7 +159,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (msg.exchangeId() != null) { if (log.isDebugEnabled()) { - log.debug("Need process affinity change message [lastAffVer=" + lastAffVer + + log.debug("Ignore affinity change message [lastAffVer=" + lastAffVer + ", msgExchId=" + msg.exchangeId() + ", msgVer=" + msg.topologyVersion() + ']'); } @@ -193,14 +193,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * @param topVer Expected topology version. */ - private void onCacheStopped(AffinityTopologyVersion topVer) { + private void onCacheGroupStopped(AffinityTopologyVersion topVer) { CacheAffinityChangeMessage msg = null; synchronized (mux) { if (waitInfo == null || !waitInfo.topVer.equals(topVer)) return; - if (waitInfo.waitCaches.isEmpty()) { + if (waitInfo.waitGrps.isEmpty()) { msg = affinityChangeMessage(waitInfo); waitInfo = null; @@ -218,9 +218,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * @param top Topology. - * @param checkCacheId Cache ID. + * @param checkGrpId Group ID. */ - void checkRebalanceState(GridDhtPartitionTopology top, Integer checkCacheId) { + void checkRebalanceState(GridDhtPartitionTopology top, Integer checkGrpId) { if (!lateAffAssign) return; @@ -234,14 +234,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert affCalcVer.equals(waitInfo.topVer) : "Invalid affinity version [calcVer=" + affCalcVer + ", waitVer=" + waitInfo.topVer + ']'; - Map<Integer, UUID> partWait = waitInfo.waitCaches.get(checkCacheId); + Map<Integer, UUID> partWait = waitInfo.waitGrps.get(checkGrpId); boolean rebalanced = true; if (partWait != null) { - CacheHolder cache = caches.get(checkCacheId); + CacheGroupHolder grpHolder = grpHolders.get(checkGrpId); - if (cache != null) { + if (grpHolder != null) { for (Iterator<Map.Entry<Integer, UUID>> it = partWait.entrySet().iterator(); it.hasNext(); ) { Map.Entry<Integer, UUID> e = it.next(); @@ -261,9 +261,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } if (rebalanced) { - waitInfo.waitCaches.remove(checkCacheId); + waitInfo.waitGrps.remove(checkGrpId); - if (waitInfo.waitCaches.isEmpty()) { + if (waitInfo.waitGrps.isEmpty()) { msg = affinityChangeMessage(waitInfo); waitInfo = null; @@ -292,7 +292,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap Map<Integer, Map<Integer, List<UUID>>> assignmentsChange = U.newHashMap(waitInfo.assignments.size()); for (Map.Entry<Integer, Map<Integer, List<ClusterNode>>> e : waitInfo.assignments.entrySet()) { - Integer cacheId = e.getKey(); + Integer grpId = e.getKey(); Map<Integer, List<ClusterNode>> assignment = e.getValue(); @@ -301,23 +301,23 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap for (Map.Entry<Integer, List<ClusterNode>> e0 : assignment.entrySet()) assignment0.put(e0.getKey(), toIds0(e0.getValue())); - assignmentsChange.put(cacheId, assignment0); + assignmentsChange.put(grpId, assignment0); } return new CacheAffinityChangeMessage(waitInfo.topVer, assignmentsChange, waitInfo.deploymentIds); } /** - * @param cctx Cache context. + * @param grp Cache group. */ - public void onCacheCreated(GridCacheContext cctx) { - final Integer cacheId = cctx.cacheId(); + void onCacheGroupCreated(CacheGroupContext grp) { + final Integer grpId = grp.groupId(); - if (!caches.containsKey(cctx.cacheId())) { - cctx.io().addHandler(cacheId, GridDhtAffinityAssignmentResponse.class, + if (!grpHolders.containsKey(grp.groupId())) { + cctx.io().addCacheGroupHandler(grpId, GridDhtAffinityAssignmentResponse.class, new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() { @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) { - processAffinityAssignmentResponse(nodeId, res); + processAffinityAssignmentResponse(grpId, nodeId, res); } }); } @@ -327,28 +327,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param exchActions Cache change requests to execute on exchange. */ private void updateCachesInfo(ExchangeActions exchActions) { - for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) { - DynamicCacheDescriptor desc = registeredCaches.remove(action.descriptor().cacheId()); + for (CacheGroupDescriptor stopDesc : exchActions.cacheGroupsToStop()) { + CacheGroupDescriptor rmvd = registeredGrps.remove(stopDesc.groupId()); - assert desc != null : action.request().cacheName(); + assert rmvd != null : stopDesc.cacheOrGroupName(); } - for (ExchangeActions.ActionData action : exchActions.cacheStartRequests()) { - DynamicCacheChangeRequest req = action.request(); - - Integer cacheId = action.descriptor().cacheId(); - - DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cctx.kernalContext(), - req.startCacheConfiguration(), - req.cacheType(), - false, - action.descriptor().receivedFrom(), - action.descriptor().staticallyConfigured(), - action.descriptor().sql(), - req.deploymentId(), - req.schema()); - - DynamicCacheDescriptor old = registeredCaches.put(cacheId, desc); + for (CacheGroupDescriptor startDesc : exchActions.cacheGroupsToStart()) { + CacheGroupDescriptor old = registeredGrps.put(startDesc.groupId(), startDesc); assert old == null : old; } @@ -365,7 +351,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap */ public boolean onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut, boolean crd, - ExchangeActions exchActions) + final ExchangeActions exchActions) throws IgniteCheckedException { assert exchActions != null && !exchActions.empty() : exchActions; @@ -373,9 +359,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap updateCachesInfo(exchActions); // Affinity did not change for existing caches. - forAllCaches(crd && lateAffAssign, new IgniteInClosureX<GridAffinityAssignmentCache>() { + forAllCacheGroups(crd && lateAffAssign, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { - if (fut.stopping(aff.cacheId())) + if (exchActions.cacheGroupStopping(aff.groupId())) return; aff.clientEventTopologyChange(fut.discoveryEvent(), fut.topologyVersion()); @@ -397,8 +383,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap nearCfg = req.nearCacheConfiguration(); } else { - startCache = cctx.cacheContext(action.descriptor().cacheId()) == null && - CU.affinityNode(cctx.localNode(), req.startCacheConfiguration().getNodeFilter()); + startCache = cctx.cacheContext(cacheDesc.cacheId()) == null && + CU.affinityNode(cctx.localNode(), cacheDesc.groupDescriptor().config().getNodeFilter()); } try { @@ -406,102 +392,122 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap cctx.cache().prepareCacheStart(cacheDesc, nearCfg, fut.topologyVersion()); if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) { - if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty()) + if (fut.discoCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty()) U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName()); } } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to initialize cache. Will try to rollback cache start routine. " + + "[cacheName=" + req.cacheName() + ']', e); - if (!crd || !lateAffAssign) { - GridCacheContext cacheCtx = cctx.cacheContext(cacheDesc.cacheId()); - - if (cacheCtx != null && !cacheCtx.isLocal()) { - boolean clientCacheStarted = - req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId()); + cctx.cache().forceCloseCache(fut.topologyVersion(), action, e); + } + } - if (clientCacheStarted) - initAffinity(cacheDesc, cacheCtx.affinity().affinityCache(), fut, lateAffAssign); - else if (!req.clientStartOnly()) { - assert fut.topologyVersion().equals(cacheCtx.startTopologyVersion()); + Set<Integer> gprs = new HashSet<>(); - GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache(); + for (ExchangeActions.ActionData action : exchActions.newAndClientCachesStartRequests()) { + Integer grpId = action.descriptor().groupId(); - assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion(); + if (gprs.add(grpId)) { + if (crd && lateAffAssign) + initStartedGroupOnCoordinator(fut, action.descriptor().groupDescriptor());else { + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); - List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), - fut.discoveryEvent(), fut.discoCache()); + if (grp != null && !grp.isLocal() && grp.localStartVersion().equals(fut.topologyVersion())) { + assert grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) : grp.affinity().lastVersion(); - aff.initialize(fut.topologyVersion(), assignment); - } + initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut); } } - else - initStartedCacheOnCoordinator(fut, cacheDesc.cacheId()); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to initialize cache. Will try to rollback cache start routine. " + - "[cacheName=" + req.cacheName() + ']', e); - - cctx.cache().forceCloseCache(fut.topologyVersion(), action, e); } } - for (DynamicCacheChangeRequest req : exchActions.closeRequests(cctx.localNodeId())) { - Integer cacheId = CU.cacheId(req.cacheName()); + List<ExchangeActions.ActionData> closeReqs = exchActions.closeRequests(cctx.localNodeId()); - cctx.cache().blockGateway(req); + for (ExchangeActions.ActionData req : closeReqs) { + cctx.cache().blockGateway(req.request()); if (crd) { - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + CacheGroupContext grp = cctx.cache().cacheGroup(req.descriptor().groupId()); + + assert grp != null; + + if (grp.affinityNode()) + continue; - // Client cache was stopped, need create 'client' CacheHolder. - if (cacheCtx != null && !cacheCtx.affinityNode()) { - CacheHolder cache = caches.remove(cacheId); + boolean grpClosed = false; - assert !cache.client() : cache; + if (grp.sharedGroup()) { + boolean cacheRemaining = false; - cache = CacheHolder2.create(cctx, - cctx.cache().cacheDescriptor(cacheId), - fut, - cache.affinity()); + for (GridCacheContext ctx : cctx.cacheContexts()) { + if (ctx.group() == grp && !cacheClosed(ctx.cacheId(), closeReqs)) { + cacheRemaining = true; - caches.put(cacheId, cache); + break; + } + } + + if (!cacheRemaining) + grpClosed = true; } - } - } + else + grpClosed = true; - Set<Integer> stoppedCaches = null; + // All client cache groups were stopped, need create 'client' CacheGroupHolder. + if (grpClosed) { + CacheGroupHolder grpHolder = grpHolders.remove(grp.groupId()); - for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) { - DynamicCacheDescriptor desc = action.descriptor(); + if (grpHolder != null) { + assert !grpHolder.client() : grpHolder; + grpHolder = CacheGroupHolder2.create(cctx, + registeredGrps.get(grp.groupId()), + fut, + grp.affinity()); + + grpHolders.put(grp.groupId(), grpHolder); + } + } + } + } + + for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) cctx.cache().blockGateway(action.request()); - if (crd && lateAffAssign && desc.cacheConfiguration().getCacheMode() != LOCAL) { - CacheHolder cache = caches.remove(desc.cacheId()); + Set<Integer> stoppedGrps = null; + + if (crd && lateAffAssign) { + for (CacheGroupDescriptor grpDesc : exchActions.cacheGroupsToStop()) { + if (grpDesc.config().getCacheMode() != LOCAL) { + CacheGroupHolder cacheGrp = grpHolders.remove(grpDesc.groupId()); - assert cache != null : action.request(); + assert cacheGrp != null : grpDesc; - if (stoppedCaches == null) - stoppedCaches = new HashSet<>(); + if (stoppedGrps == null) + stoppedGrps = new HashSet<>(); - stoppedCaches.add(cache.cacheId()); + stoppedGrps.add(cacheGrp.groupId()); - cctx.io().removeHandler(desc.cacheId(), GridDhtAffinityAssignmentResponse.class); + cctx.io().removeHandler(true, cacheGrp.groupId(), GridDhtAffinityAssignmentResponse.class); + } } } - if (stoppedCaches != null) { + if (stoppedGrps != null) { boolean notify = false; synchronized (mux) { if (waitInfo != null) { - for (Integer cacheId : stoppedCaches) { - boolean rmv = waitInfo.waitCaches.remove(cacheId) != null; + for (Integer grpId : stoppedGrps) { + boolean rmv = waitInfo.waitGrps.remove(grpId) != null; if (rmv) { notify = true; - waitInfo.assignments.remove(cacheId); + waitInfo.assignments.remove(grpId); } } } @@ -512,7 +518,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - onCacheStopped(topVer); + onCacheGroupStopped(topVer); } }); } @@ -522,12 +528,26 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** + * @param cacheId Cache ID. + * @param closeReqs Close requests. + * @return {@code True} if requests contain request for given cache ID. + */ + private boolean cacheClosed(int cacheId, List<ExchangeActions.ActionData> closeReqs) { + for (ExchangeActions.ActionData req : closeReqs) { + if (req.descriptor().cacheId() == cacheId) + return true; + } + + return false; + } + + /** * */ public void removeAllCacheInfo() { - caches.clear(); + grpHolders.clear(); - registeredCaches.clear(); + registeredGrps.clear(); } /** @@ -555,13 +575,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); - forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { + forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { List<List<ClusterNode>> idealAssignment = aff.idealAssignment(); assert idealAssignment != null; - Map<Integer, List<UUID>> cacheAssignment = assignment.get(aff.cacheId()); + Map<Integer, List<UUID>> cacheAssignment = assignment.get(aff.groupId()); List<List<ClusterNode>> newAssignment; @@ -611,25 +631,25 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); - forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { + forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { AffinityTopologyVersion affTopVer = aff.lastVersion(); assert affTopVer.topologyVersion() > 0 : affTopVer; - DynamicCacheDescriptor desc = registeredCaches.get(aff.cacheId()); + CacheGroupDescriptor desc = registeredGrps.get(aff.groupId()); - assert desc != null : aff.cacheName(); + assert desc != null : aff.cacheOrGroupName(); IgniteUuid deploymentId = desc.deploymentId(); - if (!deploymentId.equals(deploymentIds.get(aff.cacheId()))) { + if (!deploymentId.equals(deploymentIds.get(aff.groupId()))) { aff.clientEventTopologyChange(exchFut.discoveryEvent(), topVer); return; } - Map<Integer, List<UUID>> change = affChange.get(aff.cacheId()); + Map<Integer, List<UUID>> change = affChange.get(aff.groupId()); if (change != null) { assert !change.isEmpty() : msg; @@ -644,7 +664,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap List<ClusterNode> nodes = toNodes(topVer, e.getValue()); assert !nodes.equals(assignment.get(part)) : "Assignment did not change " + - "[cache=" + aff.cacheName() + + "[cacheGrp=" + aff.cacheOrGroupName() + ", part=" + part + ", cur=" + F.nodeIds(assignment.get(part)) + ", new=" + F.nodeIds(nodes) + @@ -680,7 +700,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (lateAffAssign) { if (!locJoin) { - forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { + forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { AffinityTopologyVersion topVer = fut.topologyVersion(); @@ -693,7 +713,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } else { if (!locJoin) { - forAllCaches(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { + forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { AffinityTopologyVersion topVer = fut.topologyVersion(); @@ -726,11 +746,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** + * @param grpId Cache group ID. * @param nodeId Node ID. * @param res Response. */ - private void processAffinityAssignmentResponse(UUID nodeId, - GridDhtAffinityAssignmentResponse res) { + private void processAffinityAssignmentResponse(Integer grpId, UUID nodeId, GridDhtAffinityAssignmentResponse res) { if (log.isDebugEnabled()) log.debug("Processing affinity assignment response [node=" + nodeId + ", res=" + res + ']'); @@ -744,11 +764,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param c Cache closure. * @throws IgniteCheckedException If failed */ - private void forAllRegisteredCaches(IgniteInClosureX<DynamicCacheDescriptor> c) throws IgniteCheckedException { + private void forAllRegisteredCacheGroups(IgniteInClosureX<CacheGroupDescriptor> c) throws IgniteCheckedException { assert lateAffAssign; - for (DynamicCacheDescriptor cacheDesc : registeredCaches.values()) { - if (cacheDesc.cacheConfiguration().getCacheMode() == LOCAL) + for (CacheGroupDescriptor cacheDesc : registeredGrps.values()) { + if (cacheDesc.config().getCacheMode() == LOCAL) continue; c.applyx(cacheDesc); @@ -759,56 +779,60 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param crd Coordinator flag. * @param c Closure. */ - private void forAllCaches(boolean crd, IgniteInClosureX<GridAffinityAssignmentCache> c) { + private void forAllCacheGroups(boolean crd, IgniteInClosureX<GridAffinityAssignmentCache> c) { if (crd) { - for (CacheHolder cache : caches.values()) - c.apply(cache.affinity()); + for (CacheGroupHolder grp : grpHolders.values()) + c.apply(grp.affinity()); } else { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupContext grp : cctx.kernalContext().cache().cacheGroups()) { + if (grp.isLocal()) continue; - c.apply(cacheCtx.affinity().affinityCache()); + c.apply(grp.affinity()); } } } /** * @param fut Exchange future. - * @param cacheId Cache ID. + * @param grpDesc Cache group descriptor. * @throws IgniteCheckedException If failed. */ - private void initStartedCacheOnCoordinator(GridDhtPartitionsExchangeFuture fut, final Integer cacheId) + private void initStartedGroupOnCoordinator(GridDhtPartitionsExchangeFuture fut, final CacheGroupDescriptor grpDesc) throws IgniteCheckedException { - CacheHolder cache = caches.get(cacheId); + assert grpDesc != null && grpDesc.groupId() != 0 : grpDesc; - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + if (grpDesc.config().getCacheMode() == LOCAL) + return; - if (cache == null) { - DynamicCacheDescriptor desc = cctx.cache().cacheDescriptor(cacheId); + Integer grpId = grpDesc.groupId(); - assert desc != null : cacheId; + CacheGroupHolder grpHolder = grpHolders.get(grpId); - if (desc.cacheConfiguration().getCacheMode() == LOCAL) - return; + CacheGroupContext grp = cctx.kernalContext().cache().cacheGroup(grpId); - cache = cacheCtx != null ? new CacheHolder1(cacheCtx, null) : CacheHolder2.create(cctx, desc, fut, null); + if (grpHolder == null) { + grpHolder = grp != null ? + new CacheGroupHolder1(grp, null) : + CacheGroupHolder2.create(cctx, grpDesc, fut, null); - CacheHolder old = caches.put(cacheId, cache); + CacheGroupHolder old = grpHolders.put(grpId, grpHolder); assert old == null : old; - List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + List<List<ClusterNode>> newAff = grpHolder.affinity().calculate(fut.topologyVersion(), + fut.discoveryEvent(), + fut.discoCache()); - cache.affinity().initialize(fut.topologyVersion(), newAff); + grpHolder.affinity().initialize(fut.topologyVersion(), newAff); } - else if (cache.client() && cacheCtx != null) { - assert cache.affinity().idealAssignment() != null; + else if (grpHolder.client() && grp != null) { + assert grpHolder.affinity().idealAssignment() != null; - cache = new CacheHolder1(cacheCtx, cache.affinity()); + grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity()); - caches.put(cacheId, cache); + grpHolders.put(grpId, grpHolder); } } @@ -824,14 +848,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final GridDhtPartitionsExchangeFuture fut, Collection<DynamicCacheDescriptor> descs) throws IgniteCheckedException { for (DynamicCacheDescriptor desc : descs) { - if (!registeredCaches.containsKey(desc.cacheId())) - registeredCaches.put(desc.cacheId(), desc); + CacheGroupDescriptor grpDesc = desc.groupDescriptor(); + + if (!registeredGrps.containsKey(grpDesc.groupId())) + registeredGrps.put(grpDesc.groupId(), grpDesc); } if (crd && lateAffAssign) { - forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() { - @Override public void applyx(DynamicCacheDescriptor desc) throws IgniteCheckedException { - CacheHolder cache = cache(fut, desc); + forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { + @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { + CacheGroupHolder cache = groupHolder(fut, desc); if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) { List<List<ClusterNode>> assignment = @@ -843,30 +869,28 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap }); } else { - forAllCaches(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { + forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { if (aff.lastVersion().equals(AffinityTopologyVersion.NONE)) - initAffinity(registeredCaches.get(aff.cacheId()), aff, fut, false); + initAffinity(registeredGrps.get(aff.groupId()), aff, fut); } }); } } /** - * @param desc Cache descriptor. + * @param desc Cache group descriptor. * @param aff Affinity. * @param fut Exchange future. - * @param fetch Force fetch flag. * @throws IgniteCheckedException If failed. */ - private void initAffinity(DynamicCacheDescriptor desc, + private void initAffinity(CacheGroupDescriptor desc, GridAffinityAssignmentCache aff, - GridDhtPartitionsExchangeFuture fut, - boolean fetch) + GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { - assert desc != null; + assert desc != null : aff.cacheOrGroupName(); - if (!fetch && canCalculateAffinity(desc, aff, fut)) { + if (canCalculateAffinity(desc, aff, fut)) { List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); aff.initialize(fut.topologyVersion(), assignment); @@ -884,26 +908,26 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** - * @param desc Cache descriptor. + * @param desc Cache group descriptor. * @param aff Affinity. * @param fut Exchange future. * @return {@code True} if local node can calculate affinity on it's own for this partition map exchange. */ - private boolean canCalculateAffinity(DynamicCacheDescriptor desc, + private boolean canCalculateAffinity(CacheGroupDescriptor desc, GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture fut) { - assert desc != null : aff.cacheName(); + assert desc != null : aff.cacheOrGroupName(); // Do not request affinity from remote nodes if affinity function is not centralized. - if (!aff.centralizedAffinityFunction()) + if (!lateAffAssign && !aff.centralizedAffinityFunction()) return true; // If local node did not initiate exchange or local node is the only cache node in grid. - Collection<ClusterNode> affNodes = cctx.discovery().cacheAffinityNodes(aff.cacheId(), fut.topologyVersion()); + Collection<ClusterNode> affNodes = fut.discoCache().cacheGroupAffinityNodes(aff.groupId()); - return fut.cacheAddedOnExchange(aff.cacheId(), desc.receivedFrom()) || + return fut.cacheGroupAddedOnExchange(aff.groupId(), desc.receivedFrom()) || !fut.exchangeId().nodeId().equals(cctx.localNodeId()) || - (affNodes.size() == 1 && affNodes.contains(cctx.localNode())); + (affNodes.isEmpty() || (affNodes.size() == 1 && affNodes.contains(cctx.localNode()))); } /** @@ -923,13 +947,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (lateAffAssign) { if (locJoin) { if (crd) { - forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() { - @Override public void applyx(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException { + forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { + @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { AffinityTopologyVersion topVer = fut.topologyVersion(); - CacheHolder cache = cache(fut, cacheDesc); + CacheGroupHolder cache = groupHolder(fut, desc); - List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent(), fut.discoCache()); + List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, + fut.discoveryEvent(), + fut.discoCache()); cache.affinity().initialize(topVer, newAff); } @@ -954,21 +980,21 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (crd && lateAffAssign) { if (log.isDebugEnabled()) { log.debug("Computed new affinity after node join [topVer=" + fut.topologyVersion() + - ", waitCaches=" + (info != null ? cacheNames(info.waitCaches.keySet()) : null) + ']'); + ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']'); } } } } /** - * @param cacheIds Cache IDs. + * @param grpIds Cache group IDs. * @return Cache names. */ - private String cacheNames(Collection<Integer> cacheIds) { + private String groupNames(Collection<Integer> grpIds) { StringBuilder names = new StringBuilder(); - for (Integer cacheId : cacheIds) { - String name = registeredCaches.get(cacheId).cacheConfiguration().getName(); + for (Integer grpId : grpIds) { + String name = registeredGrps.get(grpId).cacheOrGroupName(); if (names.length() != 0) names.append(", "); @@ -988,21 +1014,24 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap List<GridDhtAssignmentFetchFuture> fetchFuts = new ArrayList<>(); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - DynamicCacheDescriptor cacheDesc = registeredCaches.get(cacheCtx.cacheId()); - - if (cctx.localNodeId().equals(cacheDesc.receivedFrom())) { - List<List<ClusterNode>> assignment = - cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + if (fut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom())) { + List<List<ClusterNode>> assignment = grp.affinity().calculate(fut.topologyVersion(), + fut.discoveryEvent(), + fut.discoCache()); - cacheCtx.affinity().affinityCache().initialize(fut.topologyVersion(), assignment); + grp.affinity().initialize(fut.topologyVersion(), assignment); } else { + CacheGroupDescriptor grpDesc = registeredGrps.get(grp.groupId()); + + assert grpDesc != null : grp.cacheOrGroupName(); + GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, - cacheDesc, + grpDesc, topVer, fut.discoCache()); @@ -1015,9 +1044,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap for (int i = 0; i < fetchFuts.size(); i++) { GridDhtAssignmentFetchFuture fetchFut = fetchFuts.get(i); - Integer cacheId = fetchFut.cacheId(); + Integer grpId = fetchFut.groupId(); - fetchAffinity(fut, cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut); + fetchAffinity(fut, cctx.cache().cacheGroup(grpId).affinity(), fetchFut); } } @@ -1076,11 +1105,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap boolean centralizedAff; if (lateAffAssign) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + grp.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); } centralizedAff = true; @@ -1107,11 +1136,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private void initAffinityNoLateAssignment(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { assert !lateAffAssign; - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - initAffinity(registeredCaches.get(cacheCtx.cacheId()), cacheCtx.affinity().affinityCache(), fut, false); + initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut); } } @@ -1122,35 +1151,38 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap */ private IgniteInternalFuture<?> initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { + assert lateAffAssign; + final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new ArrayList<>(); - forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() { - @Override public void applyx(DynamicCacheDescriptor desc) throws IgniteCheckedException { - CacheHolder cache = caches.get(desc.cacheId()); + forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { + @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { + CacheGroupHolder grpHolder = grpHolders.get(desc.groupId()); - if (cache != null) { - if (cache.client()) - cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + if (grpHolder != null) { + if (grpHolder.client()) // Affinity for non-client holders calculated in {@link #onServerLeft}. + grpHolder.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); return; } - final Integer cacheId = desc.cacheId(); + // Need initialize holders and affinity if this node became coordinator during this exchange. + final Integer grpId = desc.groupId(); - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); - if (cacheCtx == null) { - cctx.io().addHandler(desc.cacheId(), GridDhtAffinityAssignmentResponse.class, + if (grp == null) { + cctx.io().addCacheGroupHandler(desc.groupId(), GridDhtAffinityAssignmentResponse.class, new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() { @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) { - processAffinityAssignmentResponse(nodeId, res); + processAffinityAssignmentResponse(grpId, nodeId, res); } } ); - cache = CacheHolder2.create(cctx, desc, fut, null); + grpHolder = CacheGroupHolder2.create(cctx, desc, fut, null); - final GridAffinityAssignmentCache aff = cache.affinity(); + final GridAffinityAssignmentCache aff = grpHolder.affinity(); List<GridDhtPartitionsExchangeFuture> exchFuts = cctx.exchange().exchangeFutures(); @@ -1160,9 +1192,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap ", total=" + exchFuts.size() + ']'; final GridDhtPartitionsExchangeFuture prev = exchFuts.get(idx + 1); + if (log.isDebugEnabled()) { log.debug("Need initialize affinity on coordinator [" + - "cache=" + desc.cacheConfiguration().getName() + + "cacheGrp=" + desc.cacheOrGroupName() + "prevAff=" + prev.topologyVersion() + ']'); } @@ -1191,9 +1224,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap futs.add(affFut); } else - cache = new CacheHolder1(cacheCtx, null); + grpHolder = new CacheGroupHolder1(grp, null); - CacheHolder old = caches.put(cache.cacheId(), cache); + CacheGroupHolder old = grpHolders.put(grpHolder.groupId(), grpHolder); assert old == null : old; } @@ -1219,38 +1252,36 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @return Cache holder. * @throws IgniteCheckedException If failed. */ - private CacheHolder cache(GridDhtPartitionsExchangeFuture fut, DynamicCacheDescriptor desc) + private CacheGroupHolder groupHolder(GridDhtPartitionsExchangeFuture fut, final CacheGroupDescriptor desc) throws IgniteCheckedException { assert lateAffAssign; - final Integer cacheId = desc.cacheId(); - - CacheHolder cache = caches.get(cacheId); + CacheGroupHolder cacheGrp = grpHolders.get(desc.groupId()); - if (cache != null) - return cache; + if (cacheGrp != null) + return cacheGrp; - GridCacheContext cacheCtx = cctx.cacheContext(desc.cacheId()); + final CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId()); - if (cacheCtx == null) { - cctx.io().addHandler(cacheId, GridDhtAffinityAssignmentResponse.class, + if (grp == null) { + cctx.io().addCacheGroupHandler(desc.groupId(), GridDhtAffinityAssignmentResponse.class, new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() { @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) { - processAffinityAssignmentResponse(nodeId, res); + processAffinityAssignmentResponse(desc.groupId(), nodeId, res); } } ); - cache = CacheHolder2.create(cctx, desc, fut, null); + cacheGrp = CacheGroupHolder2.create(cctx, desc, fut, null); } else - cache = new CacheHolder1(cacheCtx, null); + cacheGrp = new CacheGroupHolder1(grp, null); - CacheHolder old = caches.put(cache.cacheId(), cache); + CacheGroupHolder old = grpHolders.put(desc.groupId(), cacheGrp); assert old == null : old; - return cache; + return cacheGrp; } /** @@ -1261,18 +1292,20 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap */ @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException { + assert lateAffAssign; + AffinityTopologyVersion topVer = fut.topologyVersion(); final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); if (!crd) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - boolean latePrimary = cacheCtx.rebalanceEnabled(); + boolean latePrimary = grp.rebalanceEnabled(); - initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary, affCache); + initAffinityOnNodeJoin(fut, grp.affinity(), null, latePrimary, affCache); } return null; @@ -1280,9 +1313,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap else { final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer); - forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() { - @Override public void applyx(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException { - CacheHolder cache = cache(fut, cacheDesc); + forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { + @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { + CacheGroupHolder cache = groupHolder(fut, desc); boolean latePrimary = cache.rebalanceEnabled; @@ -1307,14 +1340,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap WaitRebalanceInfo rebalanceInfo, boolean latePrimary, Map<Object, List<List<ClusterNode>>> affCache) - throws IgniteCheckedException { + throws IgniteCheckedException + { assert lateAffAssign; AffinityTopologyVersion topVer = fut.topologyVersion(); AffinityTopologyVersion affTopVer = aff.lastVersion(); - assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [cache=" + aff.cacheName() + + assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [grp=" + aff.cacheOrGroupName() + ", topVer=" + affTopVer + ", node=" + cctx.localNodeId() + ']'; List<List<ClusterNode>> curAff = aff.assignments(affTopVer); @@ -1405,7 +1439,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } if (rebalance != null) - rebalance.add(aff.cacheId(), part, newNodes.get(0).id(), newNodes); + rebalance.add(aff.groupId(), part, newNodes.get(0).id(), newNodes); return nodes0; } @@ -1448,6 +1482,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap */ private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { + assert lateAffAssign; + final AffinityTopologyVersion topVer = fut.topologyVersion(); final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer); @@ -1456,24 +1492,24 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final Map<Integer, Map<Integer, List<UUID>>> assignment = new HashMap<>(); - forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() { - @Override public void applyx(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException { - CacheHolder cache = cache(fut, cacheDesc); + forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { + @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { + CacheGroupHolder grpHolder = groupHolder(fut, desc); - if (!cache.rebalanceEnabled) + if (!grpHolder.rebalanceEnabled) return; - AffinityTopologyVersion affTopVer = cache.affinity().lastVersion(); + AffinityTopologyVersion affTopVer = grpHolder.affinity().lastVersion(); assert affTopVer.topologyVersion() > 0 && !affTopVer.equals(topVer) : "Invalid affinity version " + - "[last=" + affTopVer + ", futVer=" + topVer + ", cache=" + cache.name() + ']'; + "[last=" + affTopVer + ", futVer=" + topVer + ", grp=" + desc.cacheOrGroupName() + ']'; - List<List<ClusterNode>> curAssignment = cache.affinity().assignments(affTopVer); - List<List<ClusterNode>> newAssignment = cache.affinity().idealAssignment(); + List<List<ClusterNode>> curAssignment = grpHolder.affinity().assignments(affTopVer); + List<List<ClusterNode>> newAssignment = grpHolder.affinity().idealAssignment(); assert newAssignment != null; - GridDhtPartitionTopology top = cache.topology(fut); + GridDhtPartitionTopology top = grpHolder.topology(fut); Map<Integer, List<UUID>> cacheAssignment = null; @@ -1487,7 +1523,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap List<ClusterNode> newNodes0 = null; assert newPrimary == null || aliveNodes.contains(newPrimary) : "Invalid new primary [" + - "cache=" + cache.name() + + "grp=" + desc.cacheOrGroupName() + ", node=" + newPrimary + ", topVer=" + topVer + ']'; @@ -1496,7 +1532,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap GridDhtPartitionState state = top.partitionState(newPrimary.id(), p); if (state != GridDhtPartitionState.OWNING) { - newNodes0 = latePrimaryAssignment(cache.affinity(), + newNodes0 = latePrimaryAssignment(grpHolder.affinity(), p, curPrimary, newNodes, @@ -1511,7 +1547,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap ClusterNode curNode = curNodes.get(i); if (top.partitionState(curNode.id(), p) == GridDhtPartitionState.OWNING) { - newNodes0 = latePrimaryAssignment(cache.affinity(), + newNodes0 = latePrimaryAssignment(grpHolder.affinity(), p, curNode, newNodes, @@ -1526,7 +1562,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap for (ClusterNode owner : owners) { if (aliveNodes.contains(owner)) { - newNodes0 = latePrimaryAssignment(cache.affinity(), + newNodes0 = latePrimaryAssignment(grpHolder.affinity(), p, owner, newNodes, @@ -1549,7 +1585,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } if (cacheAssignment != null) - assignment.put(cache.cacheId(), cacheAssignment); + assignment.put(grpHolder.groupId(), cacheAssignment); } }); @@ -1562,7 +1598,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (log.isDebugEnabled()) { log.debug("Computed new affinity after node left [topVer=" + topVer + - ", waitCaches=" + (info != null ? cacheNames(info.waitCaches.keySet()) : null) + ']'); + ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']'); } } @@ -1621,7 +1657,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * */ - abstract static class CacheHolder { + abstract static class CacheGroupHolder { /** */ private final GridAffinityAssignmentCache aff; @@ -1633,7 +1669,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param aff Affinity cache. * @param initAff Existing affinity cache. */ - CacheHolder(boolean rebalanceEnabled, + CacheGroupHolder(boolean rebalanceEnabled, GridAffinityAssignmentCache aff, @Nullable GridAffinityAssignmentCache initAff) { this.aff = aff; @@ -1650,10 +1686,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap abstract boolean client(); /** - * @return Cache ID. + * @return Group ID. */ - int cacheId() { - return aff.cacheId(); + int groupId() { + return aff.groupId(); } /** @@ -1664,13 +1700,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** - * @return Cache name. - */ - String name() { - return aff.cacheName(); - } - - /** * @param fut Exchange future. * @return Cache topology. */ @@ -1687,20 +1716,20 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * Created cache is started on coordinator. */ - private class CacheHolder1 extends CacheHolder { + private class CacheGroupHolder1 extends CacheGroupHolder { /** */ - private final GridCacheContext cctx; + private final CacheGroupContext grp; /** - * @param cctx Cache context. + * @param grp Cache group. * @param initAff Current affinity. */ - CacheHolder1(GridCacheContext cctx, @Nullable GridAffinityAssignmentCache initAff) { - super(cctx.rebalanceEnabled(), cctx.affinity().affinityCache(), initAff); + CacheGroupHolder1(CacheGroupContext grp, @Nullable GridAffinityAssignmentCache initAff) { + super(grp.rebalanceEnabled(), grp.affinity(), initAff); - assert !cctx.isLocal() : cctx.name(); + assert !grp.isLocal() : grp; - this.cctx = cctx; + this.grp = grp; } /** {@inheritDoc} */ @@ -1709,56 +1738,41 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** {@inheritDoc} */ - @Override public int partitions() { - return cctx.affinity().partitions(); - } - - /** {@inheritDoc} */ - @Override public String name() { - return cctx.name(); - } - - /** {@inheritDoc} */ - @Override public int cacheId() { - return cctx.cacheId(); - } - - /** {@inheritDoc} */ @Override public GridDhtPartitionTopology topology(GridDhtPartitionsExchangeFuture fut) { - return cctx.topology(); + return grp.topology(); } } /** * Created if cache is not started on coordinator. */ - private static class CacheHolder2 extends CacheHolder { + private static class CacheGroupHolder2 extends CacheGroupHolder { /** */ private final GridCacheSharedContext cctx; /** * @param cctx Context. - * @param cacheDesc Cache descriptor. + * @param grpDesc Cache group descriptor. * @param fut Exchange future. * @param initAff Current affinity. * @return Cache holder. * @throws IgniteCheckedException If failed. */ - static CacheHolder2 create( + static CacheGroupHolder2 create( GridCacheSharedContext cctx, - DynamicCacheDescriptor cacheDesc, + CacheGroupDescriptor grpDesc, GridDhtPartitionsExchangeFuture fut, @Nullable GridAffinityAssignmentCache initAff) throws IgniteCheckedException { - assert cacheDesc != null; + assert grpDesc != null; assert !cctx.kernalContext().clientNode(); - CacheConfiguration<?, ?> ccfg = cacheDesc.cacheConfiguration(); + CacheConfiguration<?, ?> ccfg = grpDesc.config(); - assert ccfg != null : cacheDesc; + assert ccfg != null : grpDesc; assert ccfg.getCacheMode() != LOCAL : ccfg.getName(); - assert !cctx.discovery().cacheAffinityNodes(ccfg.getName(), - fut.topologyVersion()).contains(cctx.localNode()) : cacheDesc.cacheName(); + assert !cctx.discovery().cacheGroupAffinityNodes(grpDesc.groupId(), + fut.topologyVersion()).contains(cctx.localNode()) : grpDesc.cacheOrGroupName(); AffinityFunction affFunc = cctx.cache().clone(ccfg.getAffinity()); @@ -1768,13 +1782,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap U.startLifecycleAware(F.asList(affFunc)); GridAffinityAssignmentCache aff = new GridAffinityAssignmentCache(cctx.kernalContext(), - ccfg.getName(), + grpDesc.cacheOrGroupName(), + grpDesc.groupId(), affFunc, ccfg.getNodeFilter(), ccfg.getBackups(), ccfg.getCacheMode() == LOCAL); - return new CacheHolder2(ccfg.getRebalanceMode() != NONE, cctx, aff, initAff); + return new CacheGroupHolder2(ccfg.getRebalanceMode() != NONE, cctx, aff, initAff); } /** @@ -1783,7 +1798,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param aff Affinity. * @param initAff Current affinity. */ - CacheHolder2( + CacheGroupHolder2( boolean rebalanceEnabled, GridCacheSharedContext cctx, GridAffinityAssignmentCache aff, @@ -1800,7 +1815,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** {@inheritDoc} */ @Override public GridDhtPartitionTopology topology(GridDhtPartitionsExchangeFuture fut) { - return cctx.exchange().clientTopology(cacheId(), fut); + return cctx.exchange().clientTopology(groupId(), fut); } } @@ -1812,7 +1827,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private final AffinityTopologyVersion topVer; /** */ - private Map<Integer, Map<Integer, UUID>> waitCaches; + private Map<Integer, Map<Integer, UUID>> waitGrps; /** */ private Map<Integer, Map<Integer, List<ClusterNode>>> assignments; @@ -1831,9 +1846,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @return {@code True} if there are partitions waiting for rebalancing. */ boolean empty() { - if (waitCaches != null) { - assert !waitCaches.isEmpty(); - assert waitCaches.size() == assignments.size(); + if (waitGrps != null) { + assert !waitGrps.isEmpty(); + assert waitGrps.size() == assignments.size(); return false; } @@ -1842,34 +1857,34 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** - * @param cacheId Cache ID. + * @param grpId Group ID. * @param part Partition. * @param waitNode Node rebalancing data. * @param assignment New assignment. */ - void add(Integer cacheId, Integer part, UUID waitNode, List<ClusterNode> assignment) { + void add(Integer grpId, Integer part, UUID waitNode, List<ClusterNode> assignment) { assert !F.isEmpty(assignment) : assignment; - if (waitCaches == null) { - waitCaches = new HashMap<>(); + if (waitGrps == null) { + waitGrps = new HashMap<>(); assignments = new HashMap<>(); deploymentIds = new HashMap<>(); } - Map<Integer, UUID> cacheWaitParts = waitCaches.get(cacheId); + Map<Integer, UUID> cacheWaitParts = waitGrps.get(grpId); if (cacheWaitParts == null) { - waitCaches.put(cacheId, cacheWaitParts = new HashMap<>()); + waitGrps.put(grpId, cacheWaitParts = new HashMap<>()); - deploymentIds.put(cacheId, registeredCaches.get(cacheId).deploymentId()); + deploymentIds.put(grpId, registeredGrps.get(grpId).deploymentId()); } cacheWaitParts.put(part, waitNode); - Map<Integer, List<ClusterNode>> cacheAssignment = assignments.get(cacheId); + Map<Integer, List<ClusterNode>> cacheAssignment = assignments.get(grpId); if (cacheAssignment == null) - assignments.put(cacheId, cacheAssignment = new HashMap<>()); + assignments.put(grpId, cacheAssignment = new HashMap<>()); cacheAssignment.put(part, assignment); } @@ -1877,7 +1892,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** {@inheritDoc} */ @Override public String toString() { return "WaitRebalanceInfo [topVer=" + topVer + - ", caches=" + (waitCaches != null ? waitCaches.keySet() : null) + ']'; + ", grps=" + (waitGrps != null ? waitGrps.keySet() : null) + ']'; } } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java index a30331f..6a6f40d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java @@ -31,18 +31,31 @@ public class CacheClientReconnectDiscoveryData implements Serializable { private static final long serialVersionUID = 0L; /** */ + private final Map<Integer, CacheGroupInfo> clientCacheGrps; + + /** */ private final Map<String, CacheInfo> clientCaches; /** * @param clientCaches Information about caches started on re-joining client node. + * @param clientCacheGrps Information about cach groups started on re-joining client node. */ - CacheClientReconnectDiscoveryData(Map<String, CacheInfo> clientCaches) { + CacheClientReconnectDiscoveryData(Map<Integer, CacheGroupInfo> clientCacheGrps, + Map<String, CacheInfo> clientCaches) { + this.clientCacheGrps = clientCacheGrps; this.clientCaches = clientCaches; } /** * @return Information about caches started on re-joining client node. */ + Map<Integer, CacheGroupInfo> clientCacheGroups() { + return clientCacheGrps; + } + + /** + * @return Information about caches started on re-joining client node. + */ Map<String, CacheInfo> clientCaches() { return clientCaches; } @@ -50,6 +63,53 @@ public class CacheClientReconnectDiscoveryData implements Serializable { /** * */ + static class CacheGroupInfo implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final CacheConfiguration ccfg; + + /** */ + private final IgniteUuid deploymentId; + + /** Flags added for future usage. */ + private final long flags; + + /** + * @param ccfg Cache group configuration. + * @param deploymentId Cache group deployment ID. + * @param flags Flags (for future usage). + */ + CacheGroupInfo(CacheConfiguration ccfg, + IgniteUuid deploymentId, + long flags) { + assert ccfg != null; + assert deploymentId != null; + + this.ccfg = ccfg; + this.deploymentId = deploymentId; + this.flags = flags; + } + + /** + * @return Cache group configuration. + */ + CacheConfiguration config() { + return ccfg; + } + + /** + * @return Cache group deployment ID. + */ + IgniteUuid deploymentId() { + return deploymentId; + } + } + + /** + * + */ static class CacheInfo implements Serializable { /** */ private static final long serialVersionUID = 0L; @@ -67,7 +127,7 @@ public class CacheClientReconnectDiscoveryData implements Serializable { private final boolean nearCache; /** Flags added for future usage. */ - private final byte flags; + private final long flags; /** * @param ccfg Cache configuration. @@ -80,7 +140,7 @@ public class CacheClientReconnectDiscoveryData implements Serializable { CacheType cacheType, IgniteUuid deploymentId, boolean nearCache, - byte flags) { + long flags) { assert ccfg != null; assert cacheType != null; assert deploymentId != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java index 3e2c259..b728d96 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java @@ -35,7 +35,10 @@ public class CacheData implements Serializable { private final CacheConfiguration cacheCfg; /** */ - private final Integer cacheId; + private final int cacheId; + + /** */ + private final int grpId; /** */ private final CacheType cacheType; @@ -59,11 +62,12 @@ public class CacheData implements Serializable { private final boolean template; /** Flags added for future usage. */ - private final byte flags; + private final long flags; /** * @param cacheCfg Cache configuration. * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param cacheType Cache ID. * @param deploymentId Cache deployment ID. * @param schema Query schema. @@ -75,6 +79,7 @@ public class CacheData implements Serializable { */ CacheData(CacheConfiguration cacheCfg, int cacheId, + int grpId, CacheType cacheType, IgniteUuid deploymentId, QuerySchema schema, @@ -82,14 +87,16 @@ public class CacheData implements Serializable { boolean staticCfg, boolean sql, boolean template, - byte flags) { + long flags) { assert cacheCfg != null; assert rcvdFrom != null : cacheCfg.getName(); assert deploymentId != null : cacheCfg.getName(); assert template || cacheId != 0 : cacheCfg.getName(); + assert template || grpId != 0 : cacheCfg.getName(); this.cacheCfg = cacheCfg; this.cacheId = cacheId; + this.grpId = grpId; this.cacheType = cacheType; this.deploymentId = deploymentId; this.schema = schema; @@ -101,9 +108,16 @@ public class CacheData implements Serializable { } /** + * @return Cache group ID. + */ + public int groupId() { + return grpId; + } + + /** * @return Cache ID. */ - public Integer cacheId() { + public int cacheId() { return cacheId; }
