Repository: ignite Updated Branches: refs/heads/ignite-1093-2 a7f925b25 -> f2da44dcf
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e22267b8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e22267b8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e22267b8 Branch: refs/heads/ignite-1093-2 Commit: e22267b84c2e2c8cf4392bca8f34c4ab1c743e7e Parents: a7f925b Author: Anton Vinogradov <[email protected]> Authored: Wed Oct 7 13:34:52 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Wed Oct 7 13:34:52 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 37 ++++++++------------ 1 file changed, 14 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e22267b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index a170bc6..e743765 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -427,21 +427,13 @@ public class GridDhtPartitionDemander { initD.updateSequence(fut.updateSeq); try { - if (!topologyChanged(fut)) { - cctx.io().sendOrderedMessage(node, - GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout()); + cctx.io().sendOrderedMessage(node, + GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout()); - if (log.isDebugEnabled()) - log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + - cnt + ", partitions count=" + sParts.get(cnt).size() + - " (" + partitionsList(sParts.get(cnt)) + ")]"); - - } - else { - fut.cancel(); - - return; - } + if (log.isDebugEnabled()) + log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + + cnt + ", partitions count=" + sParts.get(cnt).size() + + " (" + partitionsList(sParts.get(cnt)) + ")]"); } catch (IgniteCheckedException ex) { fut.cancel(); @@ -530,10 +522,15 @@ public class GridDhtPartitionDemander { assert node != null; - if (!fut.topologyVersion().equals(topVer) || // Current future based on another topology. - topologyChanged(fut) || // Topology already changed (for current future) or new topology pending. + if (//!fut.topologyVersion().equals(topVer) || // Current future based on another topology. !fut.isActual(supply.updateSequence())) // Current future have same topology, but another update sequence. + return; // Supple message based on another future. + + if (topologyChanged(fut)) { // Topology already changed (for the future that supply message based on). + fut.cancel(); + return; + } if (log.isDebugEnabled()) log.debug("Received supply message: " + supply); @@ -553,12 +550,6 @@ public class GridDhtPartitionDemander { try { // Preload. for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) { - if (topologyChanged(fut)) { - fut.cancel(); - - return; - } - int p = e.getKey(); if (cctx.affinity().localNode(p, topVer)) { @@ -642,7 +633,7 @@ public class GridDhtPartitionDemander { d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx)); - if (!topologyChanged(fut)) { + if (!topologyChanged(fut) && !fut.isDone()) { // Send demand message. cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx), d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
