Repository: ignite Updated Branches: refs/heads/ignite-5075 93724584a -> f1128f859
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f1128f85 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f1128f85 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f1128f85 Branch: refs/heads/ignite-5075 Commit: f1128f8594e14011234eaaa50c9b5415f5e63334 Parents: 9372458 Author: sboikov <[email protected]> Authored: Wed May 10 16:06:53 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed May 10 16:13:31 2017 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 33 ++++----- .../GridDhtPartitionsExchangeFuture.java | 76 +++++++++++--------- .../GridDhtPartitionsSingleMessage.java | 12 ++-- 3 files changed, 63 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f1128f85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index c5401e0..e426426 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1256,19 +1256,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean updated = false; for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { - Integer cacheId = entry.getKey(); + Integer grpId = entry.getKey(); - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); - - if (cacheCtx != null && !cacheCtx.started()) - continue; // Can safely ignore background exchange. + CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId); GridDhtPartitionTopology top = null; - if (cacheCtx == null) - top = clientTops.get(cacheId); - else if (!cacheCtx.isLocal()) - top = cacheCtx.topology(); + if (grp == null) + top = clientTops.get(grpId); + else if (!grp.isLocal()) + top = grp.topology(); if (top != null) updated |= top.update(null, entry.getValue(), null) != null; @@ -1302,25 +1299,25 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean updated = false; for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) { - Integer cacheId = entry.getKey(); + Integer grpId = entry.getKey(); - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId); - if (cacheCtx != null && - cacheCtx.cacheStartTopologyVersion().compareTo(entry.getValue().topologyVersion()) > 0) + if (grp != null && + grp.groupStartVersion().compareTo(entry.getValue().topologyVersion()) > 0) continue; GridDhtPartitionTopology top = null; - if (cacheCtx == null) - top = clientTops.get(cacheId); - else if (!cacheCtx.isLocal()) - top = cacheCtx.topology(); + if (grp == null) + top = clientTops.get(grpId); + else if (!grp.isLocal()) + top = grp.topology(); if (top != null) { updated |= top.update(null, entry.getValue(), null) != null; - cctx.affinity().checkRebalanceState(top, cacheId); + cctx.affinity().checkRebalanceState(top, grpId); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f1128f85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 1e656b0..c7457c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -606,11 +606,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT try { if (crd != null) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - cacheCtx.topology().beforeExchange(this, !centralizedAff); + grp.topology().beforeExchange(this, !centralizedAff); } } } @@ -624,24 +624,24 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @throws IgniteCheckedException If failed. */ private void updateTopologies(boolean crd) throws IgniteCheckedException { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - GridClientPartitionTopology clientTop = cctx.exchange().clearClientTopology(cacheCtx.cacheId()); + GridClientPartitionTopology clientTop = cctx.exchange().clearClientTopology(grp.groupId()); long updSeq = clientTop == null ? -1 : clientTop.lastUpdateSequence(); - GridDhtPartitionTopology top = cacheCtx.topology(); + GridDhtPartitionTopology top = grp.topology(); if (crd) { - boolean updateTop = exchId.topologyVersion().equals(cacheCtx.startTopologyVersion()); + boolean updateTop = exchId.topologyVersion().equals(grp.localStartVersion()); if (updateTop && clientTop != null) top.update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false)); } - top.updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId())); + top.updateTopologyVersion(exchId, this, updSeq, cacheGroupStopping(grp.groupId())); } for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) @@ -786,11 +786,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert !cctx.kernalContext().clientNode(); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - cacheCtx.preloader().onTopologyChanged(this); + grp.preloader().onTopologyChanged(this); } waitPartitionRelease(); @@ -808,8 +808,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT // Partition release future is done so we can flush the write-behind store. cacheCtx.store().forceFlush(); } + } - cacheCtx.topology().beforeExchange(this, !centralizedAff); + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (grp.isLocal() || cacheGroupStopping(grp.groupId())) + continue; + + grp.topology().beforeExchange(this, !centralizedAff); } cctx.database().beforeExchange(this); @@ -922,11 +927,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * */ private void onLeft() { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - cacheCtx.preloader().unwindUndeploys(); + grp.preloader().unwindUndeploys(); } cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion()); @@ -1113,18 +1118,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT boolean realExchange = !dummy && !forcePreload; if (err == null && realExchange) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; try { if (centralizedAff) - cacheCtx.topology().initPartitions(this); + grp.topology().initPartitions(this); } catch (IgniteInterruptedCheckedException e) { U.error(log, "Failed to initialize partitions.", e); } + } + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { GridCacheContext drCacheCtx = cacheCtx.isNear() ? cacheCtx.near().dht().context() : cacheCtx; if (drCacheCtx.isDrEnabled()) { @@ -1177,8 +1184,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT initFut.onDone(err == null); if (exchId.isLeft()) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) - cacheCtx.config().getAffinity().removeNode(exchId.nodeId()); + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) + grp.config().getAffinity().removeNode(exchId.nodeId()); } exchActions = null; @@ -1525,9 +1532,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (Thread.currentThread().isInterrupted()) return; - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) - cacheCtx.topology().detectLostPartitions(discoEvt); + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (!grp.isLocal()) + grp.topology().detectLostPartitions(discoEvt); } } } @@ -1540,6 +1547,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (Thread.currentThread().isInterrupted()) return; + // TODO: IGNITE-5075. for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal() && cacheNames.contains(cacheCtx.name())) cacheCtx.topology().resetLostPartitions(); @@ -1555,9 +1563,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert crd.isLocal(); if (!crd.equals(discoCache.serverNodes().get(0))) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) - cacheCtx.topology().beforeExchange(this, !centralizedAff); + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (!grp.isLocal()) + grp.topology().beforeExchange(this, !centralizedAff); } } @@ -1631,11 +1639,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT */ private void assignPartitionsStates() { if (cctx.database().persistenceEnabled()) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - assignPartitionStates(cacheCtx.topology()); + assignPartitionStates(grp.topology()); } } } @@ -1942,13 +1950,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT List<ClusterNode> empty = Collections.emptyList(); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - List<List<ClusterNode>> affAssignment = new ArrayList<>(cacheCtx.affinity().partitions()); + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + List<List<ClusterNode>> affAssignment = new ArrayList<>(grp.affinity().partitions()); - for (int i = 0; i < cacheCtx.affinity().partitions(); i++) + for (int i = 0; i < grp.affinity().partitions(); i++) affAssignment.add(empty); - cacheCtx.affinity().affinityCache().initialize(topologyVersion(), affAssignment); + grp.affinity().initialize(topologyVersion(), affAssignment); } onDone(topologyVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/f1128f85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 30d35a2..8d95a58 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -139,23 +139,23 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param cntrMap Partition update counters. */ - public void partitionUpdateCounters(int cacheId, Map<Integer, T2<Long, Long>> cntrMap) { + public void partitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) { if (partCntrs == null) partCntrs = new HashMap<>(); - partCntrs.put(cacheId, cntrMap); + partCntrs.put(grpId, cntrMap); } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @return Partition update counters. */ - @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId) { + @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId) { if (partCntrs != null) { - Map<Integer, T2<Long, Long>> res = partCntrs.get(cacheId); + Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId); return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap(); }
