Repository: ignite Updated Branches: refs/heads/ignite-1093-2 db41a172b -> 9aa613055
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9aa61305 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9aa61305 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9aa61305 Branch: refs/heads/ignite-1093-2 Commit: 9aa613055dbdd252a09074244365c02cd26b32f5 Parents: db41a17 Author: Anton Vinogradov <[email protected]> Authored: Thu Oct 15 13:54:26 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Thu Oct 15 13:54:26 2015 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 56 +++++++++++++------- .../dht/preloader/GridDhtPartitionDemander.java | 22 -------- 2 files changed, 37 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9aa61305/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 6737439..00c00c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1315,8 +1315,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana assignsMap.get(cacheId), forcePreload, waitList, cnt); if (r != null) { - U.log(log, "Rebalancing scheduled: [cache=" + cacheCtx.name() + - " , waitList=" + waitList.toString() + "]"); + U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name() + + ", waitList=" + waitList.toString() + "]"); if (cacheId == CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME)) marsR = r; @@ -1330,30 +1330,48 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana U.sleep(10); // Wait for thread stop. } - U.log(log, "Starting caches rebalancing [top=" + exchFut.topologyVersion() + "]"); + if (marsR != null || !rebalancingQueue.isEmpty()) { + if (futQ.isEmpty()) { + U.log(log, "Starting caches rebalancing [top=" + exchFut.topologyVersion() + "]"); - if (marsR != null) - marsR.run();//Marshaller cache rebalancing launches in sync way. + if (marsR != null) + marsR.run();//Marshaller cache rebalancing launches in sync way. - cctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() { - @Override public Boolean call() { - try { - while (true) { - Runnable rn = rebalancingQueue.poll(); + cctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() { + @Override public Boolean call() { + try { + while (true) { + Runnable rn = rebalancingQueue.poll(); - if (rn == null) - return false; + if (rn == null) + return false; - rn.run(); + rn.run(); + } + } + finally { + boolean res = rebalancingQueueOwning.compareAndSet(1, 0); + + assert res; + } } - } - finally { - boolean res = rebalancingQueueOwning.compareAndSet(1, 0); + }, /*system pool*/ true); + } + else { + U.log(log, "Obsolete exchange, skipping rebalancing [top=" + exchFut.topologyVersion() + "]"); - assert res; - } + boolean res = rebalancingQueueOwning.compareAndSet(1, 0); + + assert res; } - }, /*system pool*/ true); + } + else { + U.log(log, "Nothing scheduled, skipping rebalancing [top=" + exchFut.topologyVersion() + "]"); + + boolean res = rebalancingQueueOwning.compareAndSet(1, 0); + + assert res; + } } } catch (IgniteInterruptedCheckedException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/9aa61305/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 46a7d02..5b5136e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -292,26 +292,12 @@ public class GridDhtPartitionDemander { rebalanceFut = fut; - if (cctx.shared().exchange().hasPendingExchange()) { // Will rebalance at actual topology. - U.log(log, "Skipping obsolete exchange. [top=" + assigns.topologyVersion() + "]"); - - fut.cancel(); - - return null; - } - if (assigns.isEmpty()) { fut.doneIfEmpty(); return null; } - if (topologyChanged(fut)) { - fut.cancel(); - - return null; - } - return new Runnable() { @Override public void run() { @@ -786,14 +772,6 @@ public class GridDhtPartitionDemander { this.log = log; this.sendStoppedEvnt = sentStopEvnt; this.updateSeq = updateSeq; - - if (assigns != null) - cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() + 1).listen( - new CI1<IgniteInternalFuture<Long>>() { - @Override public void apply(IgniteInternalFuture<Long> future) { - RebalanceFuture.this.cancel(); - } - }); // todo: is it necessary? } /**
