FInalization.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3defef36
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3defef36
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3defef36

Branch: refs/heads/ignite-3414
Commit: 3defef36b9cbec47a74900ec6914258e842e3791
Parents: d8239c0
Author: vozerov-gridgain <[email protected]>
Authored: Tue Jul 19 10:20:53 2016 +0300
Committer: vozerov-gridgain <[email protected]>
Committed: Tue Jul 19 10:20:53 2016 +0300

----------------------------------------------------------------------
 .../IgniteHadoopWeightedMapReducePlanner.java   | 95 +++++++-------------
 1 file changed, 31 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3defef36/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
index 705d3ce..5a9d5b9 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
@@ -120,7 +120,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
 
         for (HadoopInputSplit split : splits) {
             // Try getting affinity node IDs.
-            Collection<UUID> nodeIds = affinityNodesForSplit(split, top);
+            List<UUID> nodeIds = affinityNodesForSplit(split, top);
 
             // Get best node.
             UUID node = bestMapperNode(nodeIds, top);
@@ -144,12 +144,12 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
      * @return Affintiy nodes.
      * @throws IgniteCheckedException If failed.
      */
-    private Collection<UUID> affinityNodesForSplit(HadoopInputSplit split, 
HadoopMapReducePlanTopology top)
+    private List<UUID> affinityNodesForSplit(HadoopInputSplit split, 
HadoopMapReducePlanTopology top)
         throws IgniteCheckedException {
-        Collection<UUID> nodeIds = igfsAffinityNodesForSplit(split);
+        List<UUID> igfsNodeIds = igfsAffinityNodesForSplit(split);
 
-        if (nodeIds != null)
-            return nodeIds;
+        if (igfsNodeIds != null)
+            return igfsNodeIds;
 
         Map<NodeIdAndLength, UUID> res = new TreeMap<>();
 
@@ -167,7 +167,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
             }
         }
 
-        return new LinkedHashSet<>(res.values());
+        return new ArrayList<>(res.values());
     }
 
     /**
@@ -180,7 +180,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
      * @return IGFS affinity or {@code null} if IGFS is not available.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable private Collection<UUID> 
igfsAffinityNodesForSplit(HadoopInputSplit split) throws IgniteCheckedException 
{
+    @Nullable private List<UUID> igfsAffinityNodesForSplit(HadoopInputSplit 
split) throws IgniteCheckedException {
         if (split instanceof HadoopFileBlock) {
             HadoopFileBlock split0 = (HadoopFileBlock)split;
 
@@ -209,7 +209,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
                         assert blocks != null;
 
                         if (blocks.size() == 1)
-                            return blocks.iterator().next().nodeIds();
+                            return new 
ArrayList<>(blocks.iterator().next().nodeIds());
                         else {
                             // The most "local" nodes go first.
                             Map<UUID, Long> idToLen = new HashMap<>();
@@ -231,7 +231,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
                                 res.put(new NodeIdAndLength(id, 
idToLenEntry.getValue()), id);
                             }
 
-                            return new LinkedHashSet<>(res.values());
+                            return new ArrayList<>(res.values());
                         }
                     }
                 }
@@ -248,7 +248,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
      * @param top Topology.
      * @return Result.
      */
-    private UUID bestMapperNode(@Nullable Collection<UUID> affIds, 
HadoopMapReducePlanTopology top) {
+    private UUID bestMapperNode(@Nullable List<UUID> affIds, 
HadoopMapReducePlanTopology top) {
         // Priority node.
         UUID prioAffId = F.first(affIds);
 
@@ -260,10 +260,8 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
         for (HadoopMapReducePlanGroup grp : top.groups()) {
             MapperPriority prio = groupPriority(grp, affIds, prioAffId);
 
-            int weight = grp.weight() +
-                (prio == MapperPriority.NORMAL ? rmtMapperWeight : 
locMapperWeight);
+            int weight = grp.weight() + (prio == MapperPriority.NORMAL ? 
rmtMapperWeight : locMapperWeight);
 
-            // TODO: may be just consider HIGHEST prio to have load weight 
(locMapperWeight - 1)?
             if (resGrp == null || weight < resWeight || weight == resWeight && 
prio.value() > resPrio.value()) {
                 resGrp = grp;
                 resPrio = prio;
@@ -286,65 +284,33 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
      * @param grp Group.
      * @param priority Priority.
      * @param affIds Affinity IDs.
-     * @param priorityAffId Priority affinitiy IDs.
+     * @param priorityAffId Priority affinity IDs.
      * @return Best node ID in the group.
      */
     private static UUID bestMapperNodeForGroup(HadoopMapReducePlanGroup grp, 
MapperPriority priority,
-        @Nullable Collection<UUID> affIds, @Nullable UUID priorityAffId) {
-        // Return the best node from the group.
-        int idx = 0;
+        @Nullable List<UUID> affIds, @Nullable UUID priorityAffId) {
+        if (grp.single())
+            return grp.nodeId(0);
 
         // This is rare situation when several nodes are started on the same 
host.
-        if (!grp.single()) {
-            switch (priority) {
-                case NORMAL: {
-                    // Pick any node.
-                    idx = ThreadLocalRandom.current().nextInt(grp.nodeCount());
-
-                    break;
-                }
-                case HIGH: {
-                    // Pick any affinity node.
-                    assert !F.isEmpty(affIds);
+        switch (priority) {
+            case NORMAL:
+                // Pick any node.
+                return 
grp.nodeId(ThreadLocalRandom.current().nextInt(grp.nodeCount()));
 
-                    List<Integer> cands = new ArrayList<>();
+            case HIGH:
+                // Pick any affinity node.
+                assert !F.isEmpty(affIds);
 
-                    for (int i = 0; i < grp.nodeCount(); i++) {
-                        UUID id = grp.nodeId(i);
-
-                        if (affIds.contains(id))
-                            cands.add(i);
-                    }
-
-                    assert cands.size() >= 1;
-
-                    idx = 
cands.get(ThreadLocalRandom.current().nextInt(cands.size()));
-
-                    break;
-                }
-                default: {
-                    // Find primary node.
-                    assert priorityAffId != null;
+                return 
affIds.get(ThreadLocalRandom.current().nextInt(affIds.size()));
 
-                    for (int i = 0; i < grp.nodeCount(); i++) {
-                        UUID id = grp.nodeId(i);
+            default:
+                // Pick primary node.
+                assert priority == MapperPriority.HIGHEST;
+                assert priorityAffId != null;
 
-                        if (F.eq(id, priorityAffId)) {
-                            idx = i;
-
-                            break;
-                        }
-                    }
-
-                    // TODO: can we just return 'priorityAffId' there?
-                    // TODO: is that really possible that prio == HIGHEST, but 
the id is not 'priorityAffId'?
-                    //assert priorityAffId.equals(grp.nodeId(idx));
-                    assert priority == MapperPriority.HIGHEST;
-                }
-            }
+                return priorityAffId;
         }
-
-        return grp.nodeId(idx);
     }
 
     /**
@@ -618,13 +584,12 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
      * Calculate group priority.
      *
      * @param grp Group.
-     * @param affIds Affintiy IDs.
+     * @param affIds Affinity IDs.
      * @param prioAffId Priority affinity ID.
      * @return Group priority.
      */
     private static MapperPriority groupPriority(HadoopMapReducePlanGroup grp, 
@Nullable Collection<UUID> affIds,
         @Nullable UUID prioAffId) {
-        // prioAffId is actually the 1st element of affIds:
         assert F.isEmpty(affIds) ? prioAffId == null : prioAffId == 
F.first(affIds);
         assert grp != null;
 
@@ -635,6 +600,8 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
             for (int i = 0; i < grp.nodeCount(); i++) {
                 UUID id = grp.nodeId(i);
 
+                // "affIds" is a list, so we have O(n) lookup time here. In 
practice most often we will have
+                // either 1 (IGFS) or 3 (HDFS) affinity nodes, what gives us 2 
comparisons on average, not a big deal.
                 if (affIds.contains(id)) {
                     // If there is an affinity node among this machine nodes, 
the prio is HIGH:
                     prio = MapperPriority.HIGH;

Reply via email to