Repository: ignite Updated Branches: refs/heads/ignite-1093-3 cd9753ca2 -> d208fa8ef
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c2195b5b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c2195b5b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c2195b5b Branch: refs/heads/ignite-1093-3 Commit: c2195b5b5546ffb7691ec909d8de3d0b9364c971 Parents: cd9753c Author: Anton Vinogradov <[email protected]> Authored: Tue Nov 3 16:06:28 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Tue Nov 3 16:06:28 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c2195b5b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index b131679..8f23291 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -114,6 +114,9 @@ public class GridDhtPartitionDemander { @Deprecated//Backward compatibility. To be removed in future. private final AtomicInteger dmIdx = new AtomicInteger(); + /** Cached rebalance topics. */ + private final Map<Integer, Object> rebalanceTopics; + /** * @param cctx Cctx. * @param demandLock Demand lock. @@ -135,6 +138,14 @@ public class GridDhtPartitionDemander { rebalanceFut.onDone(true); syncFut.onDone(); } + + Map<Integer, Object> tops = new HashMap<>(); + + for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) { + tops.put(idx, GridCachePartitionExchangeManager.rebalanceTopic(idx)); + } + + rebalanceTopics = tops; } /** @@ -389,7 +400,7 @@ public class GridDhtPartitionDemander { // Create copy. GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt)); - initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt)); + initD.topic(rebalanceTopics.get(cnt)); initD.updateSequence(fut.updateSeq); initD.timeout(cctx.config().getRebalanceTimeout()); @@ -397,7 +408,7 @@ public class GridDhtPartitionDemander { if (!fut.isDone())// Future can be already cancelled at this moment and all failovers happened. // New requests will not be covered by failovers. cctx.io().sendOrderedMessage(node, - GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), initD.timeout()); + rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout()); } if (log.isDebugEnabled()) @@ -598,11 +609,11 @@ public class GridDhtPartitionDemander { d.timeout(cctx.config().getRebalanceTimeout()); - d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx)); + d.topic(rebalanceTopics.get(idx)); if (!topologyChanged(fut) && !fut.isDone()) { // Send demand message. - cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx), + cctx.io().sendOrderedMessage(node,rebalanceTopics.get(idx), d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout()); } }
