Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-2 a7f925b25 -> f2da44dcf


1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e22267b8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e22267b8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e22267b8

Branch: refs/heads/ignite-1093-2
Commit: e22267b84c2e2c8cf4392bca8f34c4ab1c743e7e
Parents: a7f925b
Author: Anton Vinogradov <[email protected]>
Authored: Wed Oct 7 13:34:52 2015 +0300
Committer: Anton Vinogradov <[email protected]>
Committed: Wed Oct 7 13:34:52 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 37 ++++++++------------
 1 file changed, 14 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e22267b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index a170bc6..e743765 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -427,21 +427,13 @@ public class GridDhtPartitionDemander {
                         initD.updateSequence(fut.updateSeq);
 
                         try {
-                            if (!topologyChanged(fut)) {
-                                cctx.io().sendOrderedMessage(node,
-                                    
GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), 
d.timeout());
+                            cctx.io().sendOrderedMessage(node,
+                                
GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), 
d.timeout());
 
-                                if (log.isDebugEnabled())
-                                    log.debug("Requested rebalancing [from 
node=" + node.id() + ", listener index=" +
-                                        cnt + ", partitions count=" + 
sParts.get(cnt).size() +
-                                        " (" + partitionsList(sParts.get(cnt)) 
+ ")]");
-
-                            }
-                            else {
-                                fut.cancel();
-
-                                return;
-                            }
+                            if (log.isDebugEnabled())
+                                log.debug("Requested rebalancing [from node=" 
+ node.id() + ", listener index=" +
+                                    cnt + ", partitions count=" + 
sParts.get(cnt).size() +
+                                    " (" + partitionsList(sParts.get(cnt)) + 
")]");
                         }
                         catch (IgniteCheckedException ex) {
                             fut.cancel();
@@ -530,10 +522,15 @@ public class GridDhtPartitionDemander {
 
         assert node != null;
 
-        if (!fut.topologyVersion().equals(topVer) || // Current future based 
on another topology.
-            topologyChanged(fut) || // Topology already changed (for current 
future) or new topology pending.
+        if (//!fut.topologyVersion().equals(topVer) || // Current future based 
on another topology.
             !fut.isActual(supply.updateSequence())) // Current future have 
same topology, but another update sequence.
+            return; // Supple message based on another future.
+
+        if (topologyChanged(fut)) { // Topology already changed (for the 
future that supply message based on).
+            fut.cancel();
+
             return;
+        }
 
         if (log.isDebugEnabled())
             log.debug("Received supply message: " + supply);
@@ -553,12 +550,6 @@ public class GridDhtPartitionDemander {
         try {
             // Preload.
             for (Map.Entry<Integer, CacheEntryInfoCollection> e : 
supply.infos().entrySet()) {
-                if (topologyChanged(fut)) {
-                    fut.cancel();
-
-                    return;
-                }
-
                 int p = e.getKey();
 
                 if (cctx.affinity().localNode(p, topVer)) {
@@ -642,7 +633,7 @@ public class GridDhtPartitionDemander {
 
             d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
 
-            if (!topologyChanged(fut)) {
+            if (!topologyChanged(fut) && !fut.isDone()) {
                 // Send demand message.
                 cctx.io().sendOrderedMessage(node, 
GridCachePartitionExchangeManager.rebalanceTopic(idx),
                     d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());

Reply via email to