1093

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4907c548
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4907c548
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4907c548

Branch: refs/heads/ignite-1093-2
Commit: 4907c5482f378110d0bd730c4288f2872d99933d
Parents: 4c9792e
Author: Anton Vinogradov <[email protected]>
Authored: Thu Oct 15 16:51:55 2015 +0300
Committer: Anton Vinogradov <[email protected]>
Committed: Thu Oct 15 16:51:55 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 26 +++++++-------------
 1 file changed, 9 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4907c548/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 00c00c5..4d95894 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -143,9 +143,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     /** */
     private final Queue<Runnable> rebalancingQueue = new 
ConcurrentLinkedDeque8<>();
 
-    /** */
-    private final AtomicReference<Integer> rebalancingQueueOwning = new 
AtomicReference<>(0);
-
     /**
      * Partition map futures.
      * This set also contains already completed exchange futures to address 
race conditions when coordinator
@@ -1160,6 +1157,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
             int cnt = 0;
 
+            IgniteInternalFuture asyncStartFut = null;
+
             while (!isCancelled()) {
                 GridDhtPartitionsExchangeFuture exchFut = null;
 
@@ -1326,9 +1325,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                             }
                         }
 
-                        while (!rebalancingQueueOwning.compareAndSet(0, 1)) {
-                            U.sleep(10); // Wait for thread stop.
-                        }
+                        if (asyncStartFut != null)
+                            asyncStartFut.get(); // Wait for thread stop.
 
                         if (marsR != null || !rebalancingQueue.isEmpty()) {
                             if (futQ.isEmpty()) {
@@ -1337,6 +1335,10 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                                 if (marsR != null)
                                     marsR.run();//Marshaller cache rebalancing 
launches in sync way.
 
+                                final GridFutureAdapter fut = new 
GridFutureAdapter();
+
+                                asyncStartFut = fut;
+
                                 
cctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() {
                                     @Override public Boolean call() {
                                         try {
@@ -1350,27 +1352,17 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                                             }
                                         }
                                         finally {
-                                            boolean res = 
rebalancingQueueOwning.compareAndSet(1, 0);
-
-                                            assert res;
+                                            fut.onDone();
                                         }
                                     }
                                 }, /*system pool*/ true);
                             }
                             else {
                                 U.log(log, "Obsolete exchange, skipping 
rebalancing [top=" + exchFut.topologyVersion() + "]");
-
-                                boolean res = 
rebalancingQueueOwning.compareAndSet(1, 0);
-
-                                assert res;
                             }
                         }
                         else {
                             U.log(log, "Nothing scheduled, skipping 
rebalancing [top=" + exchFut.topologyVersion() + "]");
-
-                            boolean res = 
rebalancingQueueOwning.compareAndSet(1, 0);
-
-                            assert res;
                         }
                     }
                 }

Reply via email to