Repository: ignite Updated Branches: refs/heads/ignite-1027 13276ac2e -> 82947c75e
ignite-1027 Fixed early rebalance sync future completion. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/82947c75 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/82947c75 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/82947c75 Branch: refs/heads/ignite-1027 Commit: 82947c75e305adebe5a6bfa33bb6d98872f41e06 Parents: 13276ac Author: sboikov <[email protected]> Authored: Wed Dec 2 13:16:23 2015 +0300 Committer: sboikov <[email protected]> Committed: Wed Dec 2 13:16:23 2015 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 35 +++++++++----------- .../dht/preloader/GridDhtPartitionDemander.java | 12 +++---- .../dht/preloader/GridDhtPreloader.java | 4 ++- .../preloader/GridDhtPreloaderAssignments.java | 19 ++++++++++- 4 files changed, 41 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/82947c75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 17abace..9142480 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1353,8 +1353,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (delay == 0 || forcePreload) { GridDhtPreloaderAssignments assigns = cacheCtx.preloader().assign(exchFut); - if (assigns != null) - assignsMap.put(cacheCtx.cacheId(), assigns); + assignsMap.put(cacheCtx.cacheId(), assigns); } } } @@ -1399,27 +1398,26 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana waitList.add(cctx.cacheContext(cId).name()); } - GridDhtPreloaderAssignments assignments = assignsMap.get(cacheId); + Callable<Boolean> r = cacheCtx.preloader().addAssignments(assignsMap.get(cacheId), + forcePreload, + waitList, + cnt); - if (assignments != null) { - Callable<Boolean> r = cacheCtx.preloader().addAssignments(assignments, - forcePreload, - waitList, - cnt); + if (r != null) { + U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name() + + ", waitList=" + waitList.toString() + "]"); - if (r != null) { - U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name() + - ", waitList=" + waitList.toString() + "]"); - - if (cacheId == CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME)) - marshR = r; - else - orderedRs.add(r); - } + if (cacheId == CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME)) + marshR = r; + else + orderedRs.add(r); } } } + if (asyncStartFut != null) + asyncStartFut.get(); // Wait for thread stop. + rebalanceQ.addAll(orderedRs); if (marshR != null || !rebalanceQ.isEmpty()) { @@ -1440,9 +1438,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } - if (asyncStartFut != null) - asyncStartFut.get(); // Wait for thread stop. - final GridFutureAdapter fut = new GridFutureAdapter(); asyncStartFut = fut; http://git-wip-us.apache.org/repos/asf/ignite/blob/82947c75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 6b923d0..b46979db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -294,8 +294,6 @@ public class GridDhtPartitionDemander { long delay = cctx.config().getRebalanceDelay(); if (delay == 0 || force) { - assert assigns != null; - final RebalanceFuture oldFut = rebalanceFut; final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, oldFut.isInitial(), cnt); @@ -313,7 +311,7 @@ public class GridDhtPartitionDemander { rebalanceFut = fut; if (assigns.isEmpty()) { - fut.doneIfEmpty(); + fut.doneIfEmpty(assigns.cancelled()); return null; } @@ -841,9 +839,9 @@ public class GridDhtPartitionDemander { } /** - * + * @param cancelled Is cancelled. */ - private void doneIfEmpty() { + private void doneIfEmpty(boolean cancelled) { synchronized (this) { if (isDone()) return; @@ -854,14 +852,14 @@ public class GridDhtPartitionDemander { log.debug("Rebalancing is not required [cache=" + cctx.name() + ", topology=" + topVer + "]"); - checkIsDone(); + checkIsDone(cancelled); } } /** * Cancels this future. * - * @return {@code true}. + * @return {@code True}. */ @Override public boolean cancel() { synchronized (this) { http://git-wip-us.apache.org/repos/asf/ignite/blob/82947c75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 14734d5..9a6246f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -324,7 +324,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { log.debug("Skipping assignments creation, exchange worker has pending assignments: " + exchFut.exchangeId()); - return null; + assigns.cancelled(true); + + return assigns; } // If partition belongs to local node. http://git-wip-us.apache.org/repos/asf/ignite/blob/82947c75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java index 3583967..3f82c9b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java @@ -37,19 +37,36 @@ public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode, /** Last join order. */ private final AffinityTopologyVersion topVer; + /** */ + private boolean cancelled; + /** * @param exchFut Exchange future. * @param topVer Last join order. */ public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture exchFut, AffinityTopologyVersion topVer) { assert exchFut != null; - assert topVer.topologyVersion() > 0; + assert topVer.topologyVersion() > 0 : topVer; this.exchFut = exchFut; this.topVer = topVer; } /** + * @return {@code True} if assignments creation was cancelled. + */ + public boolean cancelled() { + return cancelled; + } + + /** + * @param cancelled {@code True} if assignments creation was cancelled. + */ + public void cancelled(boolean cancelled) { + this.cancelled = cancelled; + } + + /** * @return Exchange future. */ GridDhtPartitionsExchangeFuture exchangeFuture() {
