Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-2 db41a172b -> 9aa613055


1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9aa61305
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9aa61305
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9aa61305

Branch: refs/heads/ignite-1093-2
Commit: 9aa613055dbdd252a09074244365c02cd26b32f5
Parents: db41a17
Author: Anton Vinogradov <[email protected]>
Authored: Thu Oct 15 13:54:26 2015 +0300
Committer: Anton Vinogradov <[email protected]>
Committed: Thu Oct 15 13:54:26 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 56 +++++++++++++-------
 .../dht/preloader/GridDhtPartitionDemander.java | 22 --------
 2 files changed, 37 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9aa61305/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 6737439..00c00c5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1315,8 +1315,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                                     assignsMap.get(cacheId), forcePreload, 
waitList, cnt);
 
                                 if (r != null) {
-                                    U.log(log, "Rebalancing scheduled: 
[cache=" + cacheCtx.name() +
-                                        " , waitList=" + waitList.toString() + 
"]");
+                                    U.log(log, "Cache rebalancing scheduled: 
[cache=" + cacheCtx.name() +
+                                        ", waitList=" + waitList.toString() + 
"]");
 
                                     if (cacheId == 
CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME))
                                         marsR = r;
@@ -1330,30 +1330,48 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                             U.sleep(10); // Wait for thread stop.
                         }
 
-                        U.log(log, "Starting caches rebalancing [top=" + 
exchFut.topologyVersion() + "]");
+                        if (marsR != null || !rebalancingQueue.isEmpty()) {
+                            if (futQ.isEmpty()) {
+                                U.log(log, "Starting caches rebalancing [top=" 
+ exchFut.topologyVersion() + "]");
 
-                        if (marsR != null)
-                            marsR.run();//Marshaller cache rebalancing 
launches in sync way.
+                                if (marsR != null)
+                                    marsR.run();//Marshaller cache rebalancing 
launches in sync way.
 
-                        cctx.kernalContext().closure().callLocalSafe(new 
GPC<Boolean>() {
-                            @Override public Boolean call() {
-                                try {
-                                    while (true) {
-                                        Runnable rn = rebalancingQueue.poll();
+                                
cctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() {
+                                    @Override public Boolean call() {
+                                        try {
+                                            while (true) {
+                                                Runnable rn = 
rebalancingQueue.poll();
 
-                                        if (rn == null)
-                                            return false;
+                                                if (rn == null)
+                                                    return false;
 
-                                        rn.run();
+                                                rn.run();
+                                            }
+                                        }
+                                        finally {
+                                            boolean res = 
rebalancingQueueOwning.compareAndSet(1, 0);
+
+                                            assert res;
+                                        }
                                     }
-                                }
-                                finally {
-                                    boolean res = 
rebalancingQueueOwning.compareAndSet(1, 0);
+                                }, /*system pool*/ true);
+                            }
+                            else {
+                                U.log(log, "Obsolete exchange, skipping 
rebalancing [top=" + exchFut.topologyVersion() + "]");
 
-                                    assert res;
-                                }
+                                boolean res = 
rebalancingQueueOwning.compareAndSet(1, 0);
+
+                                assert res;
                             }
-                        }, /*system pool*/ true);
+                        }
+                        else {
+                            U.log(log, "Nothing scheduled, skipping 
rebalancing [top=" + exchFut.topologyVersion() + "]");
+
+                            boolean res = 
rebalancingQueueOwning.compareAndSet(1, 0);
+
+                            assert res;
+                        }
                     }
                 }
                 catch (IgniteInterruptedCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9aa61305/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 46a7d02..5b5136e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -292,26 +292,12 @@ public class GridDhtPartitionDemander {
 
             rebalanceFut = fut;
 
-            if (cctx.shared().exchange().hasPendingExchange()) { // Will 
rebalance at actual topology.
-                U.log(log, "Skipping obsolete exchange. [top=" + 
assigns.topologyVersion() + "]");
-
-                fut.cancel();
-
-                return null;
-            }
-
             if (assigns.isEmpty()) {
                 fut.doneIfEmpty();
 
                 return null;
             }
 
-            if (topologyChanged(fut)) {
-                fut.cancel();
-
-                return null;
-            }
-
             return new Runnable() {
                 @Override
                 public void run() {
@@ -786,14 +772,6 @@ public class GridDhtPartitionDemander {
             this.log = log;
             this.sendStoppedEvnt = sentStopEvnt;
             this.updateSeq = updateSeq;
-
-            if (assigns != null)
-                
cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() + 
1).listen(
-                    new CI1<IgniteInternalFuture<Long>>() {
-                        @Override public void apply(IgniteInternalFuture<Long> 
future) {
-                            RebalanceFuture.this.cancel();
-                        }
-                    }); // todo: is it necessary?
         }
 
         /**

Reply via email to