Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-2 0566a77b5 -> d5d78c0de


1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d5d78c0d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d5d78c0d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d5d78c0d

Branch: refs/heads/ignite-1093-2
Commit: d5d78c0de68ec8be264510d70c842601e8a3dd89
Parents: 0566a77
Author: Anton Vinogradov <[email protected]>
Authored: Sat Oct 17 11:49:52 2015 +0300
Committer: Anton Vinogradov <[email protected]>
Committed: Sat Oct 17 11:49:52 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 60 +++++++-------------
 1 file changed, 22 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d5d78c0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 85649c4..d9f7b1c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -225,39 +225,27 @@ public class GridDhtPartitionDemander {
      * @param name Cache name.
      * @param fut Future.
      */
-    private void waitForCacheRebalancing(String name, RebalanceFuture fut) {
+    private boolean waitForCacheRebalancing(String name, RebalanceFuture fut) 
throws IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Waiting for " + name + " cache rebalancing [cacheName=" 
+ cctx.name() + ']');
 
-        try {
-            RebalanceFuture wFut = 
(RebalanceFuture)cctx.kernalContext().cache().internalCache(name).preloader().rebalanceFuture();
-
-            if (!topologyChanged(fut) && wFut.updateSeq == fut.updateSeq) {
-                if (!wFut.get()) {
-                    U.log(log, "Skipping waiting of " + name + " cache [top=" 
+ fut.topologyVersion() +
-                        "] (cache rebalanced with missed partitions)");
+        RebalanceFuture wFut = 
(RebalanceFuture)cctx.kernalContext().cache().internalCache(name).preloader().rebalanceFuture();
 
-                    fut.cancel();
-                }
-            }
-            else {
+        if (!topologyChanged(fut) && wFut.updateSeq == fut.updateSeq) {
+            if (!wFut.get()) {
                 U.log(log, "Skipping waiting of " + name + " cache [top=" + 
fut.topologyVersion() +
-                    "] (topology already changed)");
+                    "] (cache rebalanced with missed partitions)");
 
-                fut.cancel();
-            }
-        }
-        catch (IgniteInterruptedCheckedException ignored) {
-            if (log.isDebugEnabled()) {
-                log.debug("Failed to wait for " + name +
-                    "[cacheName=" + cctx.name() + ']');
-                fut.cancel();
+                return false;
             }
+
+            return true;
         }
-        catch (IgniteCheckedException e) {
-            fut.cancel();
+        else {
+            U.log(log, "Skipping waiting of " + name + " cache [top=" + 
fut.topologyVersion() +
+                "] (topology already changed)");
 
-            throw new Error("Ordered rebalancing future should never fail: " + 
e.getMessage(), e);
+            return false;
         }
     }
 
@@ -300,17 +288,13 @@ public class GridDhtPartitionDemander {
             }
 
             return new Callable<Boolean>() {
-                @Override public Boolean call() throws Exception{
+                @Override public Boolean call() throws Exception {
                     for (String c : caches) {
-                        waitForCacheRebalancing(c, fut);
-
-                        if (fut.isDone())
+                        if (!waitForCacheRebalancing(c, fut))
                             return false;
                     }
 
-                    requestPartitions(fut, assigns);
-
-                    return true;
+                    return requestPartitions(fut, assigns);
                 }
             };
         }
@@ -345,13 +329,11 @@ public class GridDhtPartitionDemander {
     /**
      * @param fut Future.
      */
-    private void requestPartitions(RebalanceFuture fut, 
GridDhtPreloaderAssignments assigns) throws IgniteCheckedException {
+    private boolean requestPartitions(RebalanceFuture fut,
+        GridDhtPreloaderAssignments assigns) throws IgniteCheckedException {
         for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : 
assigns.entrySet()) {
-            if (topologyChanged(fut)) {
-                fut.cancel();
-
-                return;
-            }
+            if (topologyChanged(fut))
+                return false;
 
             final ClusterNode node = e.getKey();
 
@@ -418,6 +400,8 @@ public class GridDhtPartitionDemander {
                 dw.run(node, d);
             }
         }
+
+        return true;
     }
 
     /**
@@ -1360,7 +1344,7 @@ public class GridDhtPartitionDemander {
          * @param node Node.
          * @param d D.
          */
-        public void run(ClusterNode node, GridDhtPartitionDemandMessage d) 
throws IgniteCheckedException{
+        public void run(ClusterNode node, GridDhtPartitionDemandMessage d) 
throws IgniteCheckedException {
             demandLock.readLock().lock();
 
             try {

Reply via email to