FInalization.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ae2d9e17
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ae2d9e17
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ae2d9e17

Branch: refs/heads/ignite-3414
Commit: ae2d9e1738e00965b681673080a4a23b3c1876b7
Parents: 3defef3
Author: vozerov-gridgain <[email protected]>
Authored: Tue Jul 19 10:22:37 2016 +0300
Committer: vozerov-gridgain <[email protected]>
Committed: Tue Jul 19 10:22:37 2016 +0300

----------------------------------------------------------------------
 .../IgniteHadoopWeightedMapReducePlanner.java   | 61 ++++++++++++++------
 1 file changed, 44 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2d9e17/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
index 5a9d5b9..b4b1d56 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
@@ -284,33 +284,60 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
      * @param grp Group.
      * @param priority Priority.
      * @param affIds Affinity IDs.
-     * @param priorityAffId Priority affinity IDs.
+     * @param prioAffId Priority affinity IDs.
      * @return Best node ID in the group.
      */
     private static UUID bestMapperNodeForGroup(HadoopMapReducePlanGroup grp, 
MapperPriority priority,
-        @Nullable List<UUID> affIds, @Nullable UUID priorityAffId) {
-        if (grp.single())
-            return grp.nodeId(0);
+        @Nullable Collection<UUID> affIds, @Nullable UUID prioAffId) {
+        // Return the best node from the group.
+        int idx = 0;
 
         // This is rare situation when several nodes are started on the same 
host.
-        switch (priority) {
-            case NORMAL:
-                // Pick any node.
-                return 
grp.nodeId(ThreadLocalRandom.current().nextInt(grp.nodeCount()));
+        if (!grp.single()) {
+            switch (priority) {
+                case NORMAL: {
+                    // Pick any node.
+                    idx = ThreadLocalRandom.current().nextInt(grp.nodeCount());
+
+                    break;
+                }
+                case HIGH: {
+                    // Pick any affinity node.
+                    assert affIds != null;
+
+                    List<Integer> cands = new ArrayList<>();
 
-            case HIGH:
-                // Pick any affinity node.
-                assert !F.isEmpty(affIds);
+                    for (int i = 0; i < grp.nodeCount(); i++) {
+                        UUID id = grp.nodeId(i);
 
-                return 
affIds.get(ThreadLocalRandom.current().nextInt(affIds.size()));
+                        if (affIds.contains(id))
+                            cands.add(i);
+                    }
+
+                    idx = 
cands.get(ThreadLocalRandom.current().nextInt(cands.size()));
+
+                    break;
+                }
+                default: {
+                    // Find primary node.
+                    assert prioAffId != null;
 
-            default:
-                // Pick primary node.
-                assert priority == MapperPriority.HIGHEST;
-                assert priorityAffId != null;
+                    for (int i = 0; i < grp.nodeCount(); i++) {
+                        UUID id = grp.nodeId(i);
 
-                return priorityAffId;
+                        if (F.eq(id, prioAffId)) {
+                            idx = i;
+
+                            break;
+                        }
+                    }
+
+                    assert priority == MapperPriority.HIGHEST;
+                }
+            }
         }
+
+        return grp.nodeId(idx);
     }
 
     /**

Reply via email to