Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-3 cd9753ca2 -> d208fa8ef


1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c2195b5b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c2195b5b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c2195b5b

Branch: refs/heads/ignite-1093-3
Commit: c2195b5b5546ffb7691ec909d8de3d0b9364c971
Parents: cd9753c
Author: Anton Vinogradov <[email protected]>
Authored: Tue Nov 3 16:06:28 2015 +0300
Committer: Anton Vinogradov <[email protected]>
Committed: Tue Nov 3 16:06:28 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java  | 19 +++++++++++++++----
 1 file changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c2195b5b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index b131679..8f23291 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -114,6 +114,9 @@ public class GridDhtPartitionDemander {
     @Deprecated//Backward compatibility. To be removed in future.
     private final AtomicInteger dmIdx = new AtomicInteger();
 
+    /** Cached rebalance topics. */
+    private final Map<Integer, Object> rebalanceTopics;
+
     /**
      * @param cctx Cctx.
      * @param demandLock Demand lock.
@@ -135,6 +138,14 @@ public class GridDhtPartitionDemander {
             rebalanceFut.onDone(true);
             syncFut.onDone();
         }
+
+        Map<Integer, Object> tops = new HashMap<>();
+
+        for (int idx = 0; idx < 
cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) {
+            tops.put(idx, 
GridCachePartitionExchangeManager.rebalanceTopic(idx));
+        }
+
+        rebalanceTopics = tops;
     }
 
     /**
@@ -389,7 +400,7 @@ public class GridDhtPartitionDemander {
                         // Create copy.
                         GridDhtPartitionDemandMessage initD = new 
GridDhtPartitionDemandMessage(d, sParts.get(cnt));
 
-                        
initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt));
+                        initD.topic(rebalanceTopics.get(cnt));
                         initD.updateSequence(fut.updateSeq);
                         initD.timeout(cctx.config().getRebalanceTimeout());
 
@@ -397,7 +408,7 @@ public class GridDhtPartitionDemander {
                             if (!fut.isDone())// Future can be already 
cancelled at this moment and all failovers happened.
                                 // New requests will not be covered by 
failovers.
                                 cctx.io().sendOrderedMessage(node,
-                                    
GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), 
initD.timeout());
+                                    rebalanceTopics.get(cnt), initD, 
cctx.ioPolicy(), initD.timeout());
                         }
 
                         if (log.isDebugEnabled())
@@ -598,11 +609,11 @@ public class GridDhtPartitionDemander {
 
             d.timeout(cctx.config().getRebalanceTimeout());
 
-            d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
+            d.topic(rebalanceTopics.get(idx));
 
             if (!topologyChanged(fut) && !fut.isDone()) {
                 // Send demand message.
-                cctx.io().sendOrderedMessage(node, 
GridCachePartitionExchangeManager.rebalanceTopic(idx),
+                cctx.io().sendOrderedMessage(node,rebalanceTopics.get(idx),
                     d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
             }
         }

Reply via email to