Repository: ignite Updated Branches: refs/heads/ignite-5075 1d0809367 -> 98a706f09
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/98a706f0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/98a706f0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/98a706f0 Branch: refs/heads/ignite-5075 Commit: 98a706f09597c63e4e5879c7ab2b88ab6b3b53cf Parents: 1d08093 Author: sboikov <[email protected]> Authored: Wed May 17 16:19:48 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed May 17 16:19:48 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheGroupInfrastructure.java | 9 ++- .../processors/cache/ClusterCachesInfo.java | 6 ++ .../GridCachePartitionExchangeManager.java | 66 ++++++++------------ .../processors/cache/GridCachePreloader.java | 1 - .../cache/GridCachePreloaderAdapter.java | 1 - .../dht/preloader/GridDhtPreloader.java | 1 - .../GridCacheOrderedPreloadingSelfTest.java | 14 +++-- 7 files changed, 50 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/98a706f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java index 52d60b1..d4d54c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java @@ -494,7 +494,7 @@ public class CacheGroupInfrastructure { /** * @return Group name. */ - @Nullable public String nameForLog() { + public String nameForLog() { if (ccfg.getGroupName() == null) return "[cache, name=" + ccfg.getName() + ']'; @@ -502,6 +502,13 @@ public class CacheGroupInfrastructure { } /** + * @return Group name if it is specified, otherwise cache name. + */ + public String cacheOrGroupName() { + return ccfg.getGroupName() != null ? ccfg.getGroupName() : ccfg.getName(); + } + + /** * @return Group ID. */ public int groupId() { http://git-wip-us.apache.org/repos/asf/ignite/blob/98a706f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 086145d..e824e09 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -1058,6 +1058,12 @@ class ClusterCachesInfo { CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "memoryPolicyName", "Memory policy", cfg.getCacheMode(), startCfg.getCacheMode(), true); + + CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "rebalanceDelay", "Rebalance delay", + cfg.getRebalanceDelay(), startCfg.getRebalanceDelay(), false); + + CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "rebalanceOrder", "Rebalance order", + cfg.getRebalanceOrder(), startCfg.getRebalanceOrder(), false); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/98a706f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index fcc2bdc..bff2567 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -812,18 +812,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana // If this is the oldest node. if (oldest.id().equals(cctx.localNodeId())) { // Check rebalance state & send CacheAffinityChangeMessage if need. - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) { - if (cacheCtx == null) - continue; - - GridDhtPartitionTopology top = null; - - if (!cacheCtx.isLocal()) - top = cacheCtx.topology(); + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (!grp.isLocal()) { + GridDhtPartitionTopology top = grp.topology(); if (top != null) - cctx.affinity().checkRebalanceState(top, cacheCtx.cacheId()); + cctx.affinity().checkRebalanceState(top, grp.groupId()); } } @@ -851,9 +845,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * @param nodes Nodes. - * @return {@code True} if message was sent, {@code false} if node left grid. */ - private boolean sendAllPartitions(Collection<ClusterNode> nodes) { + private void sendAllPartitions(Collection<ClusterNode> nodes) { GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, true); if (log.isDebugEnabled()) @@ -874,8 +867,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana U.warn(log, "Failed to send partitions full message [node=" + node + ", err=" + e + ']'); } } - - return true; } /** @@ -1725,8 +1716,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana try { boolean preloadFinished = true; - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - preloadFinished &= cacheCtx.preloader() != null && cacheCtx.preloader().syncFuture().isDone(); + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) + continue; + + preloadFinished &= grp.preloader() != null && grp.preloader().syncFuture().isDone(); if (!preloadFinished) break; @@ -1833,11 +1827,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean changed = false; - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - changed |= cacheCtx.topology().afterExchange(exchFut); + changed |= grp.topology().afterExchange(exchFut); } if (!cctx.kernalContext().clientNode() && changed && !hasPendingExchange()) @@ -1857,16 +1851,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (!exchFut.skipPreload() && cctx.kernalContext().state().active()) { assignsMap = new HashMap<>(); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - long delay = cacheCtx.config().getRebalanceDelay(); + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + long delay = grp.config().getRebalanceDelay(); GridDhtPreloaderAssignments assigns = null; // Don't delay for dummy reassigns to avoid infinite recursion. if (delay == 0 || forcePreload) - assigns = cacheCtx.preloader().assign(exchFut); + assigns = grp.preloader().assign(exchFut); - assignsMap.put(cacheCtx.cacheId(), assigns); + assignsMap.put(grp.groupId(), assigns); } } } @@ -1881,16 +1875,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>(); for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) { - int cacheId = e.getKey(); + int grpId = e.getKey(); - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId); - int order = cacheCtx.config().getRebalanceOrder(); + int order = grp.config().getRebalanceOrder(); if (orderMap.get(order) == null) orderMap.put(order, new ArrayList<Integer>(size)); - orderMap.get(order).add(cacheId); + orderMap.get(order).add(grpId); } Runnable r = null; @@ -1900,35 +1894,27 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean assignsCancelled = false; for (Integer order : orderMap.descendingKeySet()) { - for (Integer cacheId : orderMap.get(order)) { - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + for (Integer grpId : orderMap.get(order)) { + CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId); - GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId); + GridDhtPreloaderAssignments assigns = assignsMap.get(grpId); if (assigns != null) assignsCancelled |= assigns.cancelled(); - List<String> waitList = new ArrayList<>(size - 1); - - for (List<Integer> cIds : orderMap.headMap(order).values()) { - for (Integer cId : cIds) - waitList.add(cctx.cacheContext(cId).name()); - } - // Cancels previous rebalance future (in case it's not done yet). // Sends previous rebalance stopped event (if necessary). // Creates new rebalance future. // Sends current rebalance started event (if necessary). // Finishes cache sync future (on empty assignments). - Runnable cur = cacheCtx.preloader().addAssignments(assigns, + Runnable cur = grp.preloader().addAssignments(assigns, forcePreload, - waitList, cnt, r, exchFut.forcedRebalanceFuture()); if (cur != null) { - rebList.add(U.maskName(cacheCtx.name())); + rebList.add(grp.cacheOrGroupName()); r = cur; } http://git-wip-us.apache.org/repos/asf/ignite/blob/98a706f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 9428d9c..9b847b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -79,7 +79,6 @@ public interface GridCachePreloader { */ public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, - Collection<String> caches, int cnt, Runnable next, @Nullable GridFutureAdapter<Boolean> forcedRebFut); http://git-wip-us.apache.org/repos/asf/ignite/blob/98a706f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index 9ca4852..e0a6063 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -160,7 +160,6 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { /** {@inheritDoc} */ @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, - Collection<String> caches, int cnt, Runnable next, @Nullable GridFutureAdapter<Boolean> forcedRebFut) { http://git-wip-us.apache.org/repos/asf/ignite/blob/98a706f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 52c1600..e373f4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -326,7 +326,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forceRebalance, - Collection<String> caches, int cnt, Runnable next, @Nullable GridFutureAdapter<Boolean> forcedRebFut) { http://git-wip-us.apache.org/repos/asf/ignite/blob/98a706f0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java index 7562fe5..bc4f2cb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java @@ -68,8 +68,10 @@ public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest { /** Caches rebalance finish times. */ private ConcurrentHashMap8<Integer, ConcurrentHashMap8<String, Long>> times; + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTestsStarted(); + times = new ConcurrentHashMap8<>(); for (int i = 0; i < GRID_CNT; i++) @@ -93,8 +95,8 @@ public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest { Map<IgnitePredicate<? extends Event>, int[]> listeners = new HashMap<>(); listeners.put(new IgnitePredicate<CacheRebalancingEvent>() { - @Override public boolean apply(CacheRebalancingEvent event) { - times.get(gridIdx(event)).putIfAbsent(event.cacheName(), event.timestamp()); + @Override public boolean apply(CacheRebalancingEvent evt) { + times.get(gridIdx(evt)).putIfAbsent(evt.cacheName(), evt.timestamp()); return true; } }, new int[]{EventType.EVT_CACHE_REBALANCE_STOPPED}); @@ -194,7 +196,11 @@ public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest { } } - private int gridIdx(Event event) { - return getTestIgniteInstanceIndex((String)event.node().attributes().get(GRID_NAME_ATTR)); + /** + * @param evt Event. + * @return Index event node. + */ + private int gridIdx(Event evt) { + return getTestIgniteInstanceIndex((String)evt.node().attributes().get(GRID_NAME_ATTR)); } } \ No newline at end of file
