1093

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/be4addef
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/be4addef
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/be4addef

Branch: refs/heads/ignite-1093-2
Commit: be4addef01a28f54b887dc59c6b194e46edfdb7f
Parents: 8ba913b
Author: Anton Vinogradov <[email protected]>
Authored: Tue Oct 20 14:20:45 2015 +0300
Committer: Anton Vinogradov <[email protected]>
Committed: Tue Oct 20 14:20:45 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 10 +++---
 .../dht/preloader/GridDhtPartitionDemander.java | 32 +++++++++++---------
 2 files changed, 24 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/be4addef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 5251c61..6793f9f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1338,7 +1338,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                                         marsR.call();//Marshaller cache 
rebalancing launches in sync way.
                                     }
                                     catch (Exception ex) {
-                                        U.error(log, "Failed to send partition 
demand message to node", ex);
+                                        if (log.isDebugEnabled())
+                                            log.debug("Failed to send initial 
demand request to node");
 
                                         continue;
                                     }
@@ -1356,12 +1357,13 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                                                 if (r == null)
                                                     return false;
 
-                                               if (!r.call())
-                                                   return false;
+                                                if (!r.call())
+                                                    return false;
                                             }
                                         }
                                         catch (Exception ex) {
-                                            U.error(log, "Failed to send 
partition demand message to node");
+                                            if (log.isDebugEnabled())
+                                                log.debug("Failed to send 
initial demand request to node");
 
                                             return false;
                                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/be4addef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index f5f4c56..fbb187b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -337,20 +337,23 @@ public class GridDhtPartitionDemander {
 
             GridDhtPartitionDemandMessage d = e.getValue();
 
-            d.timeout(cctx.config().getRebalanceTimeout());
-            d.workerId(0);//old api support.
+            fut.appendPartitions(node.id(), d.partitions());//Future 
preparation.
+        }
+
+        for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : 
assigns.entrySet()) {
+            final ClusterNode node = e.getKey();
 
             final CacheConfiguration cfg = cctx.config();
 
+            final Collection<Integer> parts = 
fut.remaining.get(node.id()).get2();
+
+            GridDhtPartitionDemandMessage d = e.getValue();
+
             //Check remote node rebalancing API version.
             if (new 
Integer(1).equals(node.attribute(IgniteNodeAttributes.REBALANCING_VERSION))) {
                 U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", 
mode=" + cfg.getRebalanceMode() +
-                    ", fromNode=" + node.id() + ", partitionsCount=" + 
d.partitions().size() +
-                    ", topology=" + d.topologyVersion() + ", updateSeq=" + 
fut.updateSeq + "]");
-
-                Collection<Integer> parts = new HashSet<>(d.partitions());
-
-                fut.appendPartitions(node.id(), parts);
+                    ", fromNode=" + node.id() + ", partitionsCount=" + 
parts.size() +
+                    ", topology=" + fut.topologyVersion() + ", updateSeq=" + 
fut.updateSeq + "]");
 
                 int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
 
@@ -367,7 +370,6 @@ public class GridDhtPartitionDemander {
                     sParts.get(cnt++ % lsnrCnt).add(it.next());
 
                 for (cnt = 0; cnt < lsnrCnt; cnt++) {
-
                     if (!sParts.get(cnt).isEmpty()) {
 
                         // Create copy.
@@ -375,9 +377,10 @@ public class GridDhtPartitionDemander {
 
                         
initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt));
                         initD.updateSequence(fut.updateSeq);
+                        initD.timeout(cctx.config().getRebalanceTimeout());
 
                         cctx.io().sendOrderedMessage(node,
-                            
GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), 
d.timeout());
+                            
GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), 
initD.timeout());
 
                         if (log.isDebugEnabled())
                             log.debug("Requested rebalancing [from node=" + 
node.id() + ", listener index=" +
@@ -388,12 +391,13 @@ public class GridDhtPartitionDemander {
             }
             else {
                 U.log(log, "Starting rebalancing (old api) [cache=" + 
cctx.name() + ", mode=" + cfg.getRebalanceMode() +
-                    ", fromNode=" + node.id() + ", partitionsCount=" + 
d.partitions().size() +
-                    ", topology=" + d.topologyVersion() + ", updateSeq=" + 
d.updateSequence() + "]");
+                    ", fromNode=" + node.id() + ", partitionsCount=" + 
parts.size() +
+                    ", topology=" + fut.topologyVersion() + ", updateSeq=" + 
fut.updateSeq + "]");
 
-                DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), 
fut);
+                d.timeout(cctx.config().getRebalanceTimeout());
+                d.workerId(0);//old api support.
 
-                fut.appendPartitions(node.id(), d.partitions());
+                DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), 
fut);
 
                 dw.run(node, d);
             }

Reply via email to