1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4907c548 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4907c548 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4907c548 Branch: refs/heads/ignite-1093-2 Commit: 4907c5482f378110d0bd730c4288f2872d99933d Parents: 4c9792e Author: Anton Vinogradov <[email protected]> Authored: Thu Oct 15 16:51:55 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Thu Oct 15 16:51:55 2015 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 26 +++++++------------- 1 file changed, 9 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4907c548/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 00c00c5..4d95894 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -143,9 +143,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** */ private final Queue<Runnable> rebalancingQueue = new ConcurrentLinkedDeque8<>(); - /** */ - private final AtomicReference<Integer> rebalancingQueueOwning = new AtomicReference<>(0); - /** * Partition map futures. * This set also contains already completed exchange futures to address race conditions when coordinator @@ -1160,6 +1157,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana int cnt = 0; + IgniteInternalFuture asyncStartFut = null; + while (!isCancelled()) { GridDhtPartitionsExchangeFuture exchFut = null; @@ -1326,9 +1325,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } - while (!rebalancingQueueOwning.compareAndSet(0, 1)) { - U.sleep(10); // Wait for thread stop. - } + if (asyncStartFut != null) + asyncStartFut.get(); // Wait for thread stop. if (marsR != null || !rebalancingQueue.isEmpty()) { if (futQ.isEmpty()) { @@ -1337,6 +1335,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (marsR != null) marsR.run();//Marshaller cache rebalancing launches in sync way. + final GridFutureAdapter fut = new GridFutureAdapter(); + + asyncStartFut = fut; + cctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() { @Override public Boolean call() { try { @@ -1350,27 +1352,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } finally { - boolean res = rebalancingQueueOwning.compareAndSet(1, 0); - - assert res; + fut.onDone(); } } }, /*system pool*/ true); } else { U.log(log, "Obsolete exchange, skipping rebalancing [top=" + exchFut.topologyVersion() + "]"); - - boolean res = rebalancingQueueOwning.compareAndSet(1, 0); - - assert res; } } else { U.log(log, "Nothing scheduled, skipping rebalancing [top=" + exchFut.topologyVersion() + "]"); - - boolean res = rebalancingQueueOwning.compareAndSet(1, 0); - - assert res; } } }
