Repository: ignite Updated Branches: refs/heads/ignite-1093-2 07d683fd3 -> 7fe5f119b
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7fe5f119 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7fe5f119 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7fe5f119 Branch: refs/heads/ignite-1093-2 Commit: 7fe5f119b2163e6910f64e44c68a95a7150d810b Parents: 07d683f Author: Anton Vinogradov <[email protected]> Authored: Mon Sep 14 17:24:30 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Mon Sep 14 17:24:30 2015 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 29 ++++++- .../dht/preloader/GridDhtPartitionDemander.java | 79 +++++++++++--------- 2 files changed, 70 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7fe5f119/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index e3e2d53..d71f158 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1271,12 +1271,37 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } if (assignsMap != null) { + //Marshaller cache first. + int mId = CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME); + + GridDhtPreloaderAssignments mA = assignsMap.get(mId); + + assert mA != null; + + GridCacheContext<K, V> mCacheCtx = cctx.cacheContext(mId); + + mCacheCtx.preloader().addAssignments(mA, forcePreload); + + //Utility cache second. + int uId = CU.cacheId(GridCacheUtils.UTILITY_CACHE_NAME); + + GridDhtPreloaderAssignments uA = assignsMap.get(uId); + + assert uA != null; + + GridCacheContext<K, V> uCacheCtx = cctx.cacheContext(uId); + + uCacheCtx.preloader().addAssignments(uA, forcePreload); + + //Others. for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) { int cacheId = e.getKey(); - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + if (cacheId != uId && cacheId != mId) { + GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); - cacheCtx.preloader().addAssignments(e.getValue(), forcePreload); + cacheCtx.preloader().addAssignments(e.getValue(), forcePreload); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7fe5f119/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index b902fed..b03fa67 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; +import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; @@ -208,6 +209,41 @@ public class GridDhtPartitionDemander { } /** + * @param name Name. + * @param fut Future. + */ + private void waitForCacheRebalancing(String name, SyncFuture fut) { + if (log.isDebugEnabled()) + log.debug("Waiting for " + name + " cache rebalancing [cacheName=" + cctx.name() + ']'); + + try { + SyncFuture wFut = (SyncFuture)cctx.kernalContext().cache().internalCache(name).preloader().syncFuture(); + + if (!topologyChanged(fut.assigns.topologyVersion())) + wFut.get(); + else { + fut.onCancel(); + + return; + } + } + catch (IgniteInterruptedCheckedException ignored) { + if (log.isDebugEnabled()) { + log.debug("Failed to wait for " + name + " cache rebalancing future (grid is stopping): " + + "[cacheName=" + cctx.name() + ']'); + fut.onCancel(); + + return; + } + } + catch (IgniteCheckedException e) { + fut.onCancel(); + + throw new Error("Ordered rebalancing future should never fail: " + e.getMessage(), e); + } + } + + /** * @param assigns Assignments. * @param force {@code True} if dummy reassign. * @throws IgniteCheckedException @@ -240,6 +276,10 @@ public class GridDhtPartitionDemander { if (assigns.isEmpty()) { fut.checkIsDone(); + if (fut.assigns.topologyVersion().topologyVersion() > 1)// First node. + U.log(log, "Rebalancing is not required [cache=" + cctx.name() + + ", topology=" + fut.assigns.topologyVersion() + "]"); + return; } @@ -254,43 +294,10 @@ public class GridDhtPartitionDemander { IgniteThread thread = new IgniteThread(cctx.gridName(), "demand-thread-" + cctx.cache().name(), new Runnable() { @Override public void run() { if (!CU.isMarshallerCache(cctx.name())) { - if (log.isDebugEnabled()) - log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']'); - - try { - IgniteInternalFuture mFut; - do { - mFut = cctx.kernalContext().cache().marshallerCache().preloader().syncFuture(); - } - while (!((SyncFuture)mFut).isInited() || ((SyncFuture)mFut).topologyVersion().topologyVersion() < curFut.topologyVersion().topologyVersion()); - - if (((SyncFuture)mFut).topologyVersion().topologyVersion() > curFut.topologyVersion().topologyVersion()) { - curFut.onCancel(); - - return; - } - - if (!topologyChanged(topVer)) - mFut.get(); - else { - curFut.onCancel(); - - return; - } - } - catch (IgniteInterruptedCheckedException ignored) { - if (log.isDebugEnabled()) { - log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " + - "[cacheName=" + cctx.name() + ']'); - curFut.onCancel(); - - return; - } - } - catch (IgniteCheckedException e) { - curFut.onCancel(); + waitForCacheRebalancing(GridCacheUtils.MARSH_CACHE_NAME, curFut); - throw new Error("Ordered preload future should never fail: " + e.getMessage(), e); + if (!CU.isUtilityCache(cctx.name())) { + waitForCacheRebalancing(GridCacheUtils.UTILITY_CACHE_NAME, curFut); } }
