Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-2 9fab059e7 -> c76cd7f8b


1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c76cd7f8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c76cd7f8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c76cd7f8

Branch: refs/heads/ignite-1093-2
Commit: c76cd7f8b37d2931ce2c134dbb82281e6d6905ac
Parents: 9fab059
Author: Anton Vinogradov <[email protected]>
Authored: Mon Oct 26 15:02:09 2015 +0300
Committer: Anton Vinogradov <[email protected]>
Committed: Mon Oct 26 15:02:09 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 43 ++++----------------
 1 file changed, 7 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c76cd7f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 7f63d8f..5cbc4d3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -1065,12 +1065,11 @@ public class GridDhtPartitionDemander {
          * @param topVer Topology version.
          * @param d Demand message.
          * @param exchFut Exchange future.
-         * @return Missed partitions.
          * @throws InterruptedException If interrupted.
          * @throws ClusterTopologyCheckedException If node left.
          * @throws IgniteCheckedException If failed to send message.
          */
-        private Set<Integer> demandFromNode(
+        private void demandFromNode(
             ClusterNode node,
             final AffinityTopologyVersion topVer,
             GridDhtPartitionDemandMessage d,
@@ -1083,13 +1082,8 @@ public class GridDhtPartitionDemander {
             d.topic(topic(cntr));
             d.workerId(id);
 
-            Set<Integer> missed = new HashSet<>();
-
-            // Get the same collection that will be sent in the message.
-            Collection<Integer> remaining = d.partitions();
-
             if (topologyChanged(fut))
-                return missed;
+                return;
 
             cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, 
GridDhtPartitionSupplyMessage>() {
                 @Override public void apply(UUID nodeId, 
GridDhtPartitionSupplyMessage msg) {
@@ -1106,7 +1100,7 @@ public class GridDhtPartitionDemander {
                     retry = false;
 
                     // Create copy.
-                    d = new GridDhtPartitionDemandMessage(d, remaining);
+                    d = new GridDhtPartitionDemandMessage(d, 
fut.remaining.get(node.id()).get2());
 
                     long timeout = cctx.config().getRebalanceTimeout();
 
@@ -1134,7 +1128,7 @@ public class GridDhtPartitionDemander {
                                 cctx.io().removeOrderedHandler(d.topic());
 
                                 // Must create copy to be able to work with IO 
manager thread local caches.
-                                d = new GridDhtPartitionDemandMessage(d, 
remaining);
+                                d = new GridDhtPartitionDemandMessage(d, 
fut.remaining.get(node.id()).get2());
 
                                 // Create new topic.
                                 d.topic(topic(++cntr));
@@ -1228,7 +1222,6 @@ public class GridDhtPartitionDemander {
                                         // If message was last for this 
partition,
                                         // then we take ownership.
                                         if (last) {
-                                            remaining.remove(p);
                                             fut.partitionDone(node.id(), p);
 
                                             top.own(part);
@@ -1247,7 +1240,6 @@ public class GridDhtPartitionDemander {
                                     }
                                 }
                                 else {
-                                    remaining.remove(p);
                                     fut.partitionDone(node.id(), p);
 
                                     if (log.isDebugEnabled())
@@ -1255,7 +1247,6 @@ public class GridDhtPartitionDemander {
                                 }
                             }
                             else {
-                                remaining.remove(p);
                                 fut.partitionDone(node.id(), p);
 
                                 if (log.isDebugEnabled())
@@ -1263,17 +1254,13 @@ public class GridDhtPartitionDemander {
                             }
                         }
 
-                        remaining.removeAll(s.supply().missed());
-
                         // Only request partitions based on latest topology 
version.
                         for (Integer miss : s.supply().missed()) {
                             if (cctx.affinity().localNode(miss, topVer))
-                                missed.add(miss);
-
-                            fut.partitionMissed(node.id(), miss);
+                                fut.partitionMissed(node.id(), miss);
                         }
 
-                        if (remaining.isEmpty())
+                        if (fut.remaining.get(node.id()) == null)
                             break; // While.
 
                         if (s.supply().ack()) {
@@ -1284,8 +1271,6 @@ public class GridDhtPartitionDemander {
                     }
                 }
                 while (retry && !topologyChanged(fut));
-
-                return missed;
             }
             finally {
                 cctx.io().removeOrderedHandler(d.topic());
@@ -1304,22 +1289,8 @@ public class GridDhtPartitionDemander {
 
                 AffinityTopologyVersion topVer = fut.topVer;
 
-                Collection<Integer> missed = new HashSet<>();
-
-                if (topologyChanged(fut)) {
-                    return;
-                }
-
                 try {
-                    Set<Integer> set = demandFromNode(node, topVer, d, 
exchFut);
-
-                    if (!set.isEmpty()) {
-                        if (log.isDebugEnabled())
-                            log.debug("Missed partitions from node [nodeId=" + 
node.id() + ", missed=" +
-                                set + ']');
-
-                        missed.addAll(set);
-                    }
+                    demandFromNode(node, topVer, d, exchFut);
                 }
                 catch (InterruptedException e) {
                     throw new IgniteCheckedException(e);

Reply via email to