ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/26a1bb6a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/26a1bb6a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/26a1bb6a Branch: refs/heads/ignite-5075 Commit: 26a1bb6a52c9b8fecef7b78ccbab29f002f8aff7 Parents: 31b027f Author: sboikov <[email protected]> Authored: Thu May 4 16:36:53 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu May 4 16:36:53 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 364 +++++++++---------- .../processors/cache/CacheGroupData.java | 11 + .../processors/cache/CacheGroupDescriptor.java | 10 + .../cache/CacheGroupInfrastructure.java | 13 + .../processors/cache/ClusterCachesInfo.java | 13 +- 5 files changed, 227 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/26a1bb6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 80aeef4..adaa1e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -73,7 +73,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private boolean lateAffAssign; /** Affinity information for all started caches (initialized on coordinator). */ - private ConcurrentMap<Integer, CacheGroupHolder> cacheGrps = new ConcurrentHashMap<>(); + private ConcurrentMap<Integer, CacheGroupHolder> grpHolders = new ConcurrentHashMap<>(); /** Last topology version when affinity was calculated (updated from exchange thread). */ private AffinityTopologyVersion affCalcVer; @@ -82,7 +82,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private AffinityTopologyVersion lastAffVer; /** Registered caches (updated from exchange thread). */ - private final Map<Integer, CacheGroupDescriptor> registeredCacheGrps = new HashMap<>(); + private final Map<Integer, CacheGroupDescriptor> registeredGrps = new HashMap<>(); /** */ private WaitRebalanceInfo waitInfo; @@ -127,14 +127,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { if (type == EVT_NODE_JOINED && node.isLocal()) { // Clean-up in case of client reconnect. - registeredCacheGrps.clear(); + registeredGrps.clear(); affCalcVer = null; lastAffVer = null; for (CacheGroupDescriptor desc : cctx.cache().cacheGroupDescriptors()) - registeredCacheGrps.put(desc.groupId(), desc); + registeredGrps.put(desc.groupId(), desc); } if (!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) { @@ -196,7 +196,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (waitInfo == null || !waitInfo.topVer.equals(topVer)) return; - if (waitInfo.waitCaches.isEmpty()) { + if (waitInfo.waitGrps.isEmpty()) { msg = affinityChangeMessage(waitInfo); waitInfo = null; @@ -214,9 +214,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * @param top Topology. - * @param checkCacheId Cache ID. + * @param checkGrpId Group ID. */ - void checkRebalanceState(GridDhtPartitionTopology top, Integer checkCacheId) { + void checkRebalanceState(GridDhtPartitionTopology top, Integer checkGrpId) { if (!lateAffAssign) return; @@ -230,12 +230,12 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert affCalcVer.equals(waitInfo.topVer) : "Invalid affinity version [calcVer=" + affCalcVer + ", waitVer=" + waitInfo.topVer + ']'; - Map<Integer, UUID> partWait = waitInfo.waitCaches.get(checkCacheId); + Map<Integer, UUID> partWait = waitInfo.waitGrps.get(checkGrpId); boolean rebalanced = true; if (partWait != null) { - CacheGroupHolder cache = cacheGrps.get(checkCacheId); + CacheGroupHolder cache = grpHolders.get(checkGrpId); if (cache != null) { for (Iterator<Map.Entry<Integer, UUID>> it = partWait.entrySet().iterator(); it.hasNext(); ) { @@ -257,9 +257,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } if (rebalanced) { - waitInfo.waitCaches.remove(checkCacheId); + waitInfo.waitGrps.remove(checkGrpId); - if (waitInfo.waitCaches.isEmpty()) { + if (waitInfo.waitGrps.isEmpty()) { msg = affinityChangeMessage(waitInfo); waitInfo = null; @@ -325,13 +325,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap */ private void updateCachesInfo(ExchangeActions exchActions) { for (CacheGroupDescriptor stopDesc : exchActions.cacheGroupsToStop()) { - CacheGroupDescriptor rmvd = registeredCacheGrps.remove(stopDesc.groupId()); + CacheGroupDescriptor rmvd = registeredGrps.remove(stopDesc.groupId()); assert rmvd != null : stopDesc.groupName(); } for (CacheGroupDescriptor startDesc : exchActions.cacheGroupsToStart()) { - CacheGroupDescriptor old = registeredCacheGrps.put(startDesc.groupId(), startDesc); + CacheGroupDescriptor old = registeredGrps.put(startDesc.groupId(), startDesc); assert old == null : old; } @@ -391,91 +391,85 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (fut.discoCache().cacheGroupAffinityNodes(cacheDesc.groupDescriptor().groupId()).isEmpty()) U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName()); } + } - if (!crd || !lateAffAssign) { - GridCacheContext cacheCtx = cctx.cacheContext(cacheDesc.cacheId()); - - if (cacheCtx != null && !cacheCtx.isLocal()) { - boolean clientCacheStarted = - req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId()); - - if (clientCacheStarted) - initAffinity(cacheDesc, cacheCtx.affinity().affinityCache(), fut, lateAffAssign); - else if (!req.clientStartOnly()) { - assert fut.topologyVersion().equals(cacheCtx.cacheStartTopologyVersion()); - - GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache(); + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) { + if (grp.groupStartVersion().equals(fut.topologyVersion())) { + GridAffinityAssignmentCache aff = grp.affinity(); - assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion(); + List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), + fut.discoveryEvent(), fut.discoCache()); - List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), - fut.discoveryEvent(), fut.discoCache()); + aff.initialize(fut.topologyVersion(), assignment); + } + else { + assert grp.localStartVersion().equals(fut.topologyVersion()); - aff.initialize(fut.topologyVersion(), assignment); - } + initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut, lateAffAssign); } } - else - initStartedCacheOnCoordinator(fut, cacheDesc.groupDescriptor()); } - for (DynamicCacheChangeRequest req : exchActions.closeRequests(cctx.localNodeId())) { - Integer cacheId = CU.cacheId(req.cacheName()); + if (crd) { + for (CacheGroupDescriptor grpDesc : exchActions.cacheGroupsToStart()) + initStartedGroupOnCoordinator(fut, grpDesc); + } + for (DynamicCacheChangeRequest req : exchActions.closeRequests(cctx.localNodeId())) { cctx.cache().blockGateway(req); - - if (crd) { - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); - - // Client cache was stopped, need create 'client' CacheHolder. - if (cacheCtx != null && !cacheCtx.affinityNode()) { - CacheHolder cache = caches.remove(cacheId); - - assert !cache.client() : cache; - - cache = CacheHolder2.create(cctx, - cctx.cache().cacheDescriptor(cacheId), - fut, - cache.affinity()); - - caches.put(cacheId, cache); - } - } +// TODO IGNITE-5075. +// if (crd) { +// GridCacheContext cacheCtx = cctx.cacheContext(cacheId); +// +// // Client cache was stopped, need create 'client' CacheHolder. +// if (cacheCtx != null && !cacheCtx.affinityNode()) { +// CacheHolder cache = caches.remove(cacheId); +// +// assert !cache.client() : cache; +// +// cache = CacheHolder2.create(cctx, +// cctx.cache().cacheDescriptor(cacheId), +// fut, +// cache.affinity()); +// +// caches.put(cacheId, cache); +// } +// } } - Set<Integer> stoppedCaches = null; - - for (ExchangeActions.ActionData action : exchActions.stopRequests()) { - DynamicCacheDescriptor desc = action.descriptor(); - + for (ExchangeActions.ActionData action : exchActions.stopRequests()) cctx.cache().blockGateway(action.request()); - if (crd && desc.cacheConfiguration().getCacheMode() != LOCAL) { - CacheHolder cache = caches.remove(desc.cacheId()); + Set<Integer> stoppedGrps = null; + + for (CacheGroupDescriptor grpDesc : exchActions.cacheGroupsToStop()) { + if (crd && grpDesc.config().getCacheMode() != LOCAL) { + CacheGroupHolder cacheGrp = grpHolders.remove(grpDesc.groupId()); - assert cache != null : action.request(); + assert cacheGrp != null : grpDesc; - if (stoppedCaches == null) - stoppedCaches = new HashSet<>(); + if (stoppedGrps == null) + stoppedGrps = new HashSet<>(); - stoppedCaches.add(cache.cacheId()); + stoppedGrps.add(cacheGrp.groupId()); - cctx.io().removeHandler(desc.cacheId(), GridDhtAffinityAssignmentResponse.class); + cctx.io().removeHandler(cacheGrp.groupId(), GridDhtAffinityAssignmentResponse.class); } } - if (stoppedCaches != null) { + if (stoppedGrps != null) { boolean notify = false; synchronized (mux) { if (waitInfo != null) { - for (Integer cacheId : stoppedCaches) { - boolean rmv = waitInfo.waitCaches.remove(cacheId) != null; + for (Integer grpId : stoppedGrps) { + boolean rmv = waitInfo.waitGrps.remove(grpId) != null; if (rmv) { notify = true; - waitInfo.assignments.remove(cacheId); + waitInfo.assignments.remove(grpId); } } } @@ -499,9 +493,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * */ public void removeAllCacheInfo(){ - cacheGrps.clear(); + grpHolders.clear(); - registeredCaches.clear(); + registeredGrps.clear(); } /** @@ -535,7 +529,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert idealAssignment != null; - Map<Integer, List<UUID>> cacheAssignment = assignment.get(aff.cacheId()); + Map<Integer, List<UUID>> cacheAssignment = assignment.get(aff.groupId()); List<List<ClusterNode>> newAssignment; @@ -585,25 +579,25 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); - forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { + forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { AffinityTopologyVersion affTopVer = aff.lastVersion(); assert affTopVer.topologyVersion() > 0 : affTopVer; - DynamicCacheDescriptor desc = registeredCaches.get(aff.cacheId()); + CacheGroupDescriptor desc = registeredGrps.get(aff.groupId()); - assert desc != null : aff.cacheName(); + assert desc != null : aff.groupName(); IgniteUuid deploymentId = desc.deploymentId(); - if (!deploymentId.equals(deploymentIds.get(aff.cacheId()))) { + if (!deploymentId.equals(deploymentIds.get(aff.groupId()))) { aff.clientEventTopologyChange(exchFut.discoveryEvent(), topVer); return; } - Map<Integer, List<UUID>> change = affChange.get(aff.cacheId()); + Map<Integer, List<UUID>> change = affChange.get(aff.groupId()); if (change != null) { assert !change.isEmpty() : msg; @@ -618,7 +612,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap List<ClusterNode> nodes = toNodes(topVer, e.getValue()); assert !nodes.equals(assignment.get(part)) : "Assignment did not change " + - "[cache=" + aff.cacheName() + + "[cacheGrp=" + aff.groupName() + ", part=" + part + ", cur=" + F.nodeIds(assignment.get(part)) + ", new=" + F.nodeIds(nodes) + @@ -721,11 +715,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param c Cache closure. * @throws IgniteCheckedException If failed */ - private void forAllRegisteredCaches(IgniteInClosureX<DynamicCacheDescriptor> c) throws IgniteCheckedException { + private void forAllRegisteredCacheGroups(IgniteInClosureX<CacheGroupDescriptor> c) throws IgniteCheckedException { assert lateAffAssign; - for (DynamicCacheDescriptor cacheDesc : registeredCaches.values()) { - if (cacheDesc.cacheConfiguration().getCacheMode() == LOCAL) + for (CacheGroupDescriptor cacheDesc : registeredGrps.values()) { + if (cacheDesc.config().getCacheMode() == LOCAL) continue; c.applyx(cacheDesc); @@ -738,7 +732,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap */ private void forAllCacheGroups(boolean crd, IgniteInClosureX<GridAffinityAssignmentCache> c) { if (crd) { - for (CacheGroupHolder cache : cacheGrps.values()) + for (CacheGroupHolder cache : grpHolders.values()) c.apply(cache.affinity()); } else { @@ -756,16 +750,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param grpDesc Cache group descriptor. * @throws IgniteCheckedException If failed. */ - private void initStartedCacheOnCoordinator(GridDhtPartitionsExchangeFuture fut, final CacheGroupDescriptor grpDesc) + private void initStartedGroupOnCoordinator(GridDhtPartitionsExchangeFuture fut, final CacheGroupDescriptor grpDesc) throws IgniteCheckedException { assert grpDesc != null && grpDesc.groupId() != 0 : grpDesc; if (grpDesc.config().getCacheMode() == LOCAL) return; - int grpId = grpDesc.groupId(); + Integer grpId = grpDesc.groupId(); - CacheGroupHolder grpHolder = cacheGrps.get(grpId); + CacheGroupHolder grpHolder = grpHolders.get(grpId); CacheGroupInfrastructure grp = cctx.kernalContext().cache().cacheGroup(grpId); @@ -774,7 +768,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap new CacheGroupHolder1(grp, null) : CacheGroupHolder2.create(cctx, grpDesc, fut, null); - CacheGroupHolder old = cacheGrps.put(grpId, grpHolder); + CacheGroupHolder old = grpHolders.put(grpId, grpHolder); assert old == null : old; @@ -789,7 +783,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity()); - cacheGrps.put(grpId, grpHolder); + grpHolders.put(grpId, grpHolder); } } @@ -805,14 +799,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final GridDhtPartitionsExchangeFuture fut, Collection<DynamicCacheDescriptor> descs) throws IgniteCheckedException { for (DynamicCacheDescriptor desc : descs) { - if (!registeredCaches.containsKey(desc.cacheId())) - registeredCaches.put(desc.cacheId(), desc); + CacheGroupDescriptor grpDesc = desc.groupDescriptor(); + + if (!registeredGrps.containsKey(grpDesc.groupId())) + registeredGrps.put(grpDesc.groupId(), grpDesc); } if (crd && lateAffAssign) { - forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() { - @Override public void applyx(DynamicCacheDescriptor desc) throws IgniteCheckedException { - CacheGroupHolder cache = cache(fut, desc); + forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { + @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { + CacheGroupHolder cache = groupHolder(fut, desc); if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) { List<List<ClusterNode>> assignment = @@ -827,7 +823,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { if (aff.lastVersion().equals(AffinityTopologyVersion.NONE)) - initAffinity(registeredCaches.get(aff.cacheId()), aff, fut, false); + initAffinity(registeredGrps.get(aff.groupId()), aff, fut, false); } }); } @@ -872,15 +868,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap return true; // If local node did not initiate exchange or local node is the only cache node in grid. - Collection<ClusterNode> affNodes = cctx.discovery().cacheAffinityNodes(aff.cacheId(), fut.topologyVersion()); + Collection<ClusterNode> affNodes = + cctx.discovery().cacheGroupAffinityNodes(aff.groupId(), fut.topologyVersion()); - DynamicCacheDescriptor cacheDesc = registeredCaches.get(aff.cacheId()); + CacheGroupDescriptor grpDesc = registeredGrps.get(aff.groupId()); - assert cacheDesc != null : aff.cacheName(); + assert grpDesc != null : aff.groupName(); - return fut.cacheStarted(aff.cacheId()) || + return grpDesc.startTopologyVersion().equals(fut.topologyVersion()) || !fut.exchangeId().nodeId().equals(cctx.localNodeId()) || - cctx.localNodeId().equals(cacheDesc.receivedFrom()) || (affNodes.size() == 1 && affNodes.contains(cctx.localNode())); } @@ -901,13 +897,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (lateAffAssign) { if (locJoin) { if (crd) { - forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() { - @Override public void applyx(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException { + forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { + @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { AffinityTopologyVersion topVer = fut.topologyVersion(); - CacheGroupHolder cache = cache(fut, cacheDesc); + CacheGroupHolder cache = groupHolder(fut, desc); - List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent(), fut.discoCache()); + List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, + fut.discoveryEvent(), + fut.discoCache()); cache.affinity().initialize(topVer, newAff); } @@ -932,21 +930,21 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (crd && lateAffAssign) { if (log.isDebugEnabled()) { log.debug("Computed new affinity after node join [topVer=" + fut.topologyVersion() + - ", waitCaches=" + (info != null ? cacheNames(info.waitCaches.keySet()) : null) + ']'); + ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']'); } } } } /** - * @param cacheIds Cache IDs. + * @param grpIds Cache group IDs. * @return Cache names. */ - private String cacheNames(Collection<Integer> cacheIds) { + private String groupNames(Collection<Integer> grpIds) { StringBuilder names = new StringBuilder(); - for (Integer cacheId : cacheIds) { - String name = registeredCaches.get(cacheId).cacheConfiguration().getName(); + for (Integer grpId : grpIds) { + String name = registeredGrps.get(grpId).groupName(); if (names.length() != 0) names.append(", "); @@ -966,21 +964,22 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap List<GridDhtAssignmentFetchFuture> fetchFuts = new ArrayList<>(); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - DynamicCacheDescriptor cacheDesc = registeredCaches.get(cacheCtx.cacheId()); - - if (cctx.localNodeId().equals(cacheDesc.receivedFrom())) { - List<List<ClusterNode>> assignment = - cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + if (grp.groupStartVersion().equals(fut.topologyVersion())) { + List<List<ClusterNode>> assignment = grp.affinity().calculate(fut.topologyVersion(), + fut.discoveryEvent(), + fut.discoCache()); - cacheCtx.affinity().affinityCache().initialize(fut.topologyVersion(), assignment); + grp.affinity().initialize(fut.topologyVersion(), assignment); } else { + CacheGroupDescriptor grpDesc = registeredGrps.get(grp.groupId()); + GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, - cacheDesc, + grpDesc, topVer, fut.discoCache()); @@ -1085,11 +1084,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private void initCachesAffinity(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { assert !lateAffAssign; - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - initAffinity(registeredCaches.get(cacheCtx.cacheId()), cacheCtx.affinity().affinityCache(), fut, false); + initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut, false); } } @@ -1102,33 +1101,33 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap throws IgniteCheckedException { final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new ArrayList<>(); - forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() { - @Override public void applyx(DynamicCacheDescriptor desc) throws IgniteCheckedException { - CacheHolder cache = caches.get(desc.cacheId()); + forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { + @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { + CacheGroupHolder grpHolder = grpHolders.get(desc.groupId()); - if (cache != null) { - if (cache.client()) - cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + if (grpHolder != null) { + if (grpHolder.client()) + grpHolder.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); return; } - final Integer cacheId = desc.cacheId(); + final Integer grpId = desc.groupId(); - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId); - if (cacheCtx == null) { - cctx.io().addHandler(desc.cacheId(), GridDhtAffinityAssignmentResponse.class, + if (grp == null) { + cctx.io().addHandler(desc.groupId(), GridDhtAffinityAssignmentResponse.class, new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() { @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) { - processAffinityAssignmentResponse(cacheId, nodeId, res); + processAffinityAssignmentResponse(grpId, nodeId, res); } } ); - cache = CacheHolder2.create(cctx, desc, fut, null); + grpHolder = CacheGroupHolder2.create(cctx, desc, fut, null); - final GridAffinityAssignmentCache aff = cache.affinity(); + final GridAffinityAssignmentCache aff = grpHolder.affinity(); List<GridDhtPartitionsExchangeFuture> exchFuts = cctx.exchange().exchangeFutures(); @@ -1138,9 +1137,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap ", total=" + exchFuts.size() + ']'; final GridDhtPartitionsExchangeFuture prev = exchFuts.get(idx + 1); + if (log.isDebugEnabled()) { log.debug("Need initialize affinity on coordinator [" + - "cache=" + desc.cacheConfiguration().getName() + + "cacheGrp=" + desc.groupName() + "prevAff=" + prev.topologyVersion() + ']'); } @@ -1169,9 +1169,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap futs.add(affFut); } else - cache = new CacheHolder1(cacheCtx, null); + grpHolder = new CacheGroupHolder1(grp, null); - CacheHolder old = caches.put(cache.cacheId(), cache); + CacheGroupHolder old = grpHolders.put(grpHolder.groupId(), grpHolder); assert old == null : old; } @@ -1197,38 +1197,36 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @return Cache holder. * @throws IgniteCheckedException If failed. */ - private CacheHolder cache(GridDhtPartitionsExchangeFuture fut, DynamicCacheDescriptor desc) + private CacheGroupHolder groupHolder(GridDhtPartitionsExchangeFuture fut, final CacheGroupDescriptor desc) throws IgniteCheckedException { assert lateAffAssign; - final Integer cacheId = desc.cacheId(); - - CacheHolder cache = caches.get(cacheId); + CacheGroupHolder cacheGrp = grpHolders.get(desc.groupId()); - if (cache != null) - return cache; + if (cacheGrp != null) + return cacheGrp; - GridCacheContext cacheCtx = cctx.cacheContext(desc.cacheId()); + final CacheGroupInfrastructure grp = cctx.cache().cacheGroup(desc.groupId()); - if (cacheCtx == null) { - cctx.io().addHandler(cacheId, GridDhtAffinityAssignmentResponse.class, + if (grp == null) { + cctx.io().addHandler(desc.groupId(), GridDhtAffinityAssignmentResponse.class, new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() { @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) { - processAffinityAssignmentResponse(cacheId, nodeId, res); + processAffinityAssignmentResponse(desc.groupId(), nodeId, res); } } ); - cache = CacheHolder2.create(cctx, desc, fut, null); + cacheGrp = CacheGroupHolder2.create(cctx, desc, fut, null); } else - cache = new CacheHolder1(cacheCtx, null); + cacheGrp = new CacheGroupHolder1(grp, null); - CacheHolder old = caches.put(cache.cacheId(), cache); + CacheGroupHolder old = grpHolders.put(desc.groupId(), cacheGrp); assert old == null : old; - return cache; + return cacheGrp; } /** @@ -1258,9 +1256,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap else { final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer); - forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() { - @Override public void applyx(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException { - CacheHolder cache = cache(fut, cacheDesc); + forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { + @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { + CacheGroupHolder cache = groupHolder(fut, desc); boolean latePrimary = cache.rebalanceEnabled; @@ -1384,7 +1382,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } if (rebalance != null) - rebalance.add(aff.cacheId(), part, newNodes.get(0).id(), newNodes); + rebalance.add(aff.groupId(), part, newNodes.get(0).id(), newNodes); return nodes0; } @@ -1435,24 +1433,24 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final Map<Integer, Map<Integer, List<UUID>>> assignment = new HashMap<>(); - forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() { - @Override public void applyx(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException { - CacheHolder cache = cache(fut, cacheDesc); + forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { + @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { + CacheGroupHolder grpHolder = groupHolder(fut, desc); - if (!cache.rebalanceEnabled) + if (!grpHolder.rebalanceEnabled) return; - AffinityTopologyVersion affTopVer = cache.affinity().lastVersion(); + AffinityTopologyVersion affTopVer = grpHolder.affinity().lastVersion(); assert affTopVer.topologyVersion() > 0 && !affTopVer.equals(topVer) : "Invalid affinity version " + - "[last=" + affTopVer + ", futVer=" + topVer + ", cache=" + cache.name() + ']'; + "[last=" + affTopVer + ", futVer=" + topVer + ", grp=" + desc.groupName() + ']'; - List<List<ClusterNode>> curAssignment = cache.affinity().assignments(affTopVer); - List<List<ClusterNode>> newAssignment = cache.affinity().idealAssignment(); + List<List<ClusterNode>> curAssignment = grpHolder.affinity().assignments(affTopVer); + List<List<ClusterNode>> newAssignment = grpHolder.affinity().idealAssignment(); assert newAssignment != null; - GridDhtPartitionTopology top = cache.topology(fut); + GridDhtPartitionTopology top = grpHolder.topology(fut); Map<Integer, List<UUID>> cacheAssignment = null; @@ -1466,7 +1464,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap List<ClusterNode> newNodes0 = null; assert newPrimary == null || aliveNodes.contains(newPrimary) : "Invalid new primary [" + - "cache=" + cache.name() + + "grp=" + desc.groupName() + ", node=" + newPrimary + ", topVer=" + topVer + ']'; @@ -1475,7 +1473,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap GridDhtPartitionState state = top.partitionState(newPrimary.id(), p); if (state != GridDhtPartitionState.OWNING) { - newNodes0 = latePrimaryAssignment(cache.affinity(), + newNodes0 = latePrimaryAssignment(grpHolder.affinity(), p, curPrimary, newNodes, @@ -1490,7 +1488,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap ClusterNode curNode = curNodes.get(i); if (top.partitionState(curNode.id(), p) == GridDhtPartitionState.OWNING) { - newNodes0 = latePrimaryAssignment(cache.affinity(), + newNodes0 = latePrimaryAssignment(grpHolder.affinity(), p, curNode, newNodes, @@ -1505,7 +1503,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap for (ClusterNode owner : owners) { if (aliveNodes.contains(owner)) { - newNodes0 = latePrimaryAssignment(cache.affinity(), + newNodes0 = latePrimaryAssignment(grpHolder.affinity(), p, owner, newNodes, @@ -1528,7 +1526,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } if (cacheAssignment != null) - assignment.put(cache.cacheId(), cacheAssignment); + assignment.put(grpHolder.groupId(), cacheAssignment); } }); @@ -1541,7 +1539,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (log.isDebugEnabled()) { log.debug("Computed new affinity after node left [topVer=" + topVer + - ", waitCaches=" + (info != null ? cacheNames(info.waitCaches.keySet()) : null) + ']'); + ", waitCaches=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']'); } } @@ -1778,7 +1776,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** {@inheritDoc} */ @Override public GridDhtPartitionTopology topology(GridDhtPartitionsExchangeFuture fut) { - return cctx.exchange().clientTopology(cacheId(), fut); + return cctx.exchange().clientTopology(groupId(), fut); } } @@ -1790,7 +1788,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private final AffinityTopologyVersion topVer; /** */ - private Map<Integer, Map<Integer, UUID>> waitCaches; + private Map<Integer, Map<Integer, UUID>> waitGrps; /** */ private Map<Integer, Map<Integer, List<ClusterNode>>> assignments; @@ -1809,9 +1807,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @return {@code True} if there are partitions waiting for rebalancing. */ boolean empty() { - if (waitCaches != null) { - assert !waitCaches.isEmpty(); - assert waitCaches.size() == assignments.size(); + if (waitGrps != null) { + assert !waitGrps.isEmpty(); + assert waitGrps.size() == assignments.size(); return false; } @@ -1820,34 +1818,34 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** - * @param cacheId Cache ID. + * @param grpId Group ID. * @param part Partition. * @param waitNode Node rebalancing data. * @param assignment New assignment. */ - void add(Integer cacheId, Integer part, UUID waitNode, List<ClusterNode> assignment) { + void add(Integer grpId, Integer part, UUID waitNode, List<ClusterNode> assignment) { assert !F.isEmpty(assignment) : assignment; - if (waitCaches == null) { - waitCaches = new HashMap<>(); + if (waitGrps == null) { + waitGrps = new HashMap<>(); assignments = new HashMap<>(); deploymentIds = new HashMap<>(); } - Map<Integer, UUID> cacheWaitParts = waitCaches.get(cacheId); + Map<Integer, UUID> cacheWaitParts = waitGrps.get(grpId); if (cacheWaitParts == null) { - waitCaches.put(cacheId, cacheWaitParts = new HashMap<>()); + waitGrps.put(grpId, cacheWaitParts = new HashMap<>()); - deploymentIds.put(cacheId, registeredCaches.get(cacheId).deploymentId()); + deploymentIds.put(grpId, registeredGrps.get(grpId).deploymentId()); } cacheWaitParts.put(part, waitNode); - Map<Integer, List<ClusterNode>> cacheAssignment = assignments.get(cacheId); + Map<Integer, List<ClusterNode>> cacheAssignment = assignments.get(grpId); if (cacheAssignment == null) - assignments.put(cacheId, cacheAssignment = new HashMap<>()); + assignments.put(grpId, cacheAssignment = new HashMap<>()); cacheAssignment.put(part, assignment); } @@ -1855,7 +1853,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** {@inheritDoc} */ @Override public String toString() { return "WaitRebalanceInfo [topVer=" + topVer + - ", caches=" + (waitCaches != null ? waitCaches.keySet() : null) + ']'; + ", grps=" + (waitGrps != null ? waitGrps.keySet() : null) + ']'; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/26a1bb6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java index 0507839..0123262 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java @@ -24,6 +24,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; /** * @@ -39,6 +40,9 @@ public class CacheGroupData implements Serializable { private final int grpId; /** */ + private final IgniteUuid deploymentId; + + /** */ private final CacheConfiguration cacheCfg; /** */ @@ -56,16 +60,19 @@ public class CacheGroupData implements Serializable { public CacheGroupData(CacheConfiguration cacheCfg, String grpName, int grpId, + IgniteUuid deploymentId, AffinityTopologyVersion startTopVer, Map<String, Integer> caches) { assert cacheCfg != null; assert grpName != null; assert grpId != 0; + assert deploymentId != null; assert startTopVer != null; this.cacheCfg = cacheCfg; this.grpName = grpName; this.grpId = grpId; + this.deploymentId = deploymentId; this.startTopVer = startTopVer; this.caches = caches; } @@ -78,6 +85,10 @@ public class CacheGroupData implements Serializable { return grpId; } + public IgniteUuid deploymentId() { + return deploymentId; + } + public CacheConfiguration config() { return cacheCfg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/26a1bb6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java index c0ad67a..da55871 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java @@ -24,6 +24,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; /** @@ -37,6 +38,9 @@ public class CacheGroupDescriptor { private final int grpId; /** */ + private final IgniteUuid deploymentId; + + /** */ private final CacheConfiguration cacheCfg; /** */ @@ -48,6 +52,7 @@ public class CacheGroupDescriptor { CacheGroupDescriptor(String grpName, int grpId, + IgniteUuid deploymentId, CacheConfiguration cacheCfg, AffinityTopologyVersion startTopVer, Map<String, Integer> caches) { @@ -58,11 +63,16 @@ public class CacheGroupDescriptor { this.grpName = grpName; this.grpId = grpId; + this.deploymentId = deploymentId; this.cacheCfg = cacheCfg; this.startTopVer = startTopVer; this.caches = caches; } + public IgniteUuid deploymentId() { + return deploymentId; + } + void onCacheAdded(String cacheName, int cacheId) { assert cacheName != null; assert cacheId != 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/26a1bb6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java index 0769884..72a62ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.lang.IgniteFuture; @@ -47,6 +48,10 @@ public class CacheGroupInfrastructure { /** */ private GridDhtPartitionTopology top; + private AffinityTopologyVersion grpStartVer; + + private AffinityTopologyVersion locStartVer; + /** * @param id Group ID. * @param ctx Context. @@ -61,6 +66,14 @@ public class CacheGroupInfrastructure { this.ccfg = ccfg; } + public AffinityTopologyVersion groupStartVersion() { + return grpStartVer; + } + + public AffinityTopologyVersion localStartVersion() { + return locStartVer; + } + /** * @return {@code True} if cache is local. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/26a1bb6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index bd1e27f..060c933 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import java.io.Serializable; @@ -291,6 +292,7 @@ class ClusterCachesInfo { CacheGroupDescriptor grpDesc = registerCacheGroup(exchangeActions, ccfg, cacheId, + req.deploymentId(), topVer.nextMinorVersion()); DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx, @@ -585,6 +587,7 @@ class ClusterCachesInfo { DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx, locCfg.config(), desc.cacheType(), + desc.groupDescriptor(), desc.template(), desc.deploymentId(), desc.schema()); @@ -650,6 +653,7 @@ class ClusterCachesInfo { CacheGroupData grpData = new CacheGroupData(grpDesc.config(), grpDesc.groupName(), grpDesc.groupId(), + grpDesc.deploymentId(), grpDesc.startTopologyVersion(), grpDesc.caches()); @@ -698,6 +702,7 @@ class ClusterCachesInfo { for (CacheGroupData grpData : cachesData.cacheGroups().values()) { CacheGroupDescriptor grpDesc = new CacheGroupDescriptor(grpData.groupName(), grpData.groupId(), + grpData.deploymentId(), grpData.config(), grpData.startTopologyVersion(), grpData.caches()); @@ -859,7 +864,11 @@ class ClusterCachesInfo { if (!registeredCaches.containsKey(cfg.getName())) { int cacheId = CU.cacheId(cfg.getName()); - CacheGroupDescriptor grpDesc = registerCacheGroup(null, cfg, cacheId, topVer); + CacheGroupDescriptor grpDesc = registerCacheGroup(null, + cfg, + cacheId, + joinData.cacheDeploymentId(), + topVer); DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, cfg, @@ -891,6 +900,7 @@ class ClusterCachesInfo { ExchangeActions exchActions, CacheConfiguration startedCacheCfg, Integer cacheId, + IgniteUuid deploymentId, AffinityTopologyVersion topVer) { if (startedCacheCfg.getGroupName() != null) { CacheGroupDescriptor desc = registeredCacheGrps.get(startedCacheCfg.getGroupName()); @@ -912,6 +922,7 @@ class ClusterCachesInfo { CacheGroupDescriptor grpDesc = new CacheGroupDescriptor( grpName, grpId, + deploymentId, startedCacheCfg, topVer, caches);
