Repository: ignite Updated Branches: refs/heads/ignite-1093-2 9fab059e7 -> c76cd7f8b
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c76cd7f8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c76cd7f8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c76cd7f8 Branch: refs/heads/ignite-1093-2 Commit: c76cd7f8b37d2931ce2c134dbb82281e6d6905ac Parents: 9fab059 Author: Anton Vinogradov <[email protected]> Authored: Mon Oct 26 15:02:09 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Mon Oct 26 15:02:09 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 43 ++++---------------- 1 file changed, 7 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c76cd7f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 7f63d8f..5cbc4d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -1065,12 +1065,11 @@ public class GridDhtPartitionDemander { * @param topVer Topology version. * @param d Demand message. * @param exchFut Exchange future. - * @return Missed partitions. * @throws InterruptedException If interrupted. * @throws ClusterTopologyCheckedException If node left. * @throws IgniteCheckedException If failed to send message. */ - private Set<Integer> demandFromNode( + private void demandFromNode( ClusterNode node, final AffinityTopologyVersion topVer, GridDhtPartitionDemandMessage d, @@ -1083,13 +1082,8 @@ public class GridDhtPartitionDemander { d.topic(topic(cntr)); d.workerId(id); - Set<Integer> missed = new HashSet<>(); - - // Get the same collection that will be sent in the message. - Collection<Integer> remaining = d.partitions(); - if (topologyChanged(fut)) - return missed; + return; cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() { @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) { @@ -1106,7 +1100,7 @@ public class GridDhtPartitionDemander { retry = false; // Create copy. - d = new GridDhtPartitionDemandMessage(d, remaining); + d = new GridDhtPartitionDemandMessage(d, fut.remaining.get(node.id()).get2()); long timeout = cctx.config().getRebalanceTimeout(); @@ -1134,7 +1128,7 @@ public class GridDhtPartitionDemander { cctx.io().removeOrderedHandler(d.topic()); // Must create copy to be able to work with IO manager thread local caches. - d = new GridDhtPartitionDemandMessage(d, remaining); + d = new GridDhtPartitionDemandMessage(d, fut.remaining.get(node.id()).get2()); // Create new topic. d.topic(topic(++cntr)); @@ -1228,7 +1222,6 @@ public class GridDhtPartitionDemander { // If message was last for this partition, // then we take ownership. if (last) { - remaining.remove(p); fut.partitionDone(node.id(), p); top.own(part); @@ -1247,7 +1240,6 @@ public class GridDhtPartitionDemander { } } else { - remaining.remove(p); fut.partitionDone(node.id(), p); if (log.isDebugEnabled()) @@ -1255,7 +1247,6 @@ public class GridDhtPartitionDemander { } } else { - remaining.remove(p); fut.partitionDone(node.id(), p); if (log.isDebugEnabled()) @@ -1263,17 +1254,13 @@ public class GridDhtPartitionDemander { } } - remaining.removeAll(s.supply().missed()); - // Only request partitions based on latest topology version. for (Integer miss : s.supply().missed()) { if (cctx.affinity().localNode(miss, topVer)) - missed.add(miss); - - fut.partitionMissed(node.id(), miss); + fut.partitionMissed(node.id(), miss); } - if (remaining.isEmpty()) + if (fut.remaining.get(node.id()) == null) break; // While. if (s.supply().ack()) { @@ -1284,8 +1271,6 @@ public class GridDhtPartitionDemander { } } while (retry && !topologyChanged(fut)); - - return missed; } finally { cctx.io().removeOrderedHandler(d.topic()); @@ -1304,22 +1289,8 @@ public class GridDhtPartitionDemander { AffinityTopologyVersion topVer = fut.topVer; - Collection<Integer> missed = new HashSet<>(); - - if (topologyChanged(fut)) { - return; - } - try { - Set<Integer> set = demandFromNode(node, topVer, d, exchFut); - - if (!set.isEmpty()) { - if (log.isDebugEnabled()) - log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" + - set + ']'); - - missed.addAll(set); - } + demandFromNode(node, topVer, d, exchFut); } catch (InterruptedException e) { throw new IgniteCheckedException(e);
