FInalization.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3defef36 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3defef36 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3defef36 Branch: refs/heads/ignite-3414 Commit: 3defef36b9cbec47a74900ec6914258e842e3791 Parents: d8239c0 Author: vozerov-gridgain <[email protected]> Authored: Tue Jul 19 10:20:53 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Jul 19 10:20:53 2016 +0300 ---------------------------------------------------------------------- .../IgniteHadoopWeightedMapReducePlanner.java | 95 +++++++------------- 1 file changed, 31 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3defef36/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java index 705d3ce..5a9d5b9 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java @@ -120,7 +120,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc for (HadoopInputSplit split : splits) { // Try getting affinity node IDs. - Collection<UUID> nodeIds = affinityNodesForSplit(split, top); + List<UUID> nodeIds = affinityNodesForSplit(split, top); // Get best node. UUID node = bestMapperNode(nodeIds, top); @@ -144,12 +144,12 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc * @return Affintiy nodes. * @throws IgniteCheckedException If failed. */ - private Collection<UUID> affinityNodesForSplit(HadoopInputSplit split, HadoopMapReducePlanTopology top) + private List<UUID> affinityNodesForSplit(HadoopInputSplit split, HadoopMapReducePlanTopology top) throws IgniteCheckedException { - Collection<UUID> nodeIds = igfsAffinityNodesForSplit(split); + List<UUID> igfsNodeIds = igfsAffinityNodesForSplit(split); - if (nodeIds != null) - return nodeIds; + if (igfsNodeIds != null) + return igfsNodeIds; Map<NodeIdAndLength, UUID> res = new TreeMap<>(); @@ -167,7 +167,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc } } - return new LinkedHashSet<>(res.values()); + return new ArrayList<>(res.values()); } /** @@ -180,7 +180,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc * @return IGFS affinity or {@code null} if IGFS is not available. * @throws IgniteCheckedException If failed. */ - @Nullable private Collection<UUID> igfsAffinityNodesForSplit(HadoopInputSplit split) throws IgniteCheckedException { + @Nullable private List<UUID> igfsAffinityNodesForSplit(HadoopInputSplit split) throws IgniteCheckedException { if (split instanceof HadoopFileBlock) { HadoopFileBlock split0 = (HadoopFileBlock)split; @@ -209,7 +209,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc assert blocks != null; if (blocks.size() == 1) - return blocks.iterator().next().nodeIds(); + return new ArrayList<>(blocks.iterator().next().nodeIds()); else { // The most "local" nodes go first. Map<UUID, Long> idToLen = new HashMap<>(); @@ -231,7 +231,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc res.put(new NodeIdAndLength(id, idToLenEntry.getValue()), id); } - return new LinkedHashSet<>(res.values()); + return new ArrayList<>(res.values()); } } } @@ -248,7 +248,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc * @param top Topology. * @return Result. */ - private UUID bestMapperNode(@Nullable Collection<UUID> affIds, HadoopMapReducePlanTopology top) { + private UUID bestMapperNode(@Nullable List<UUID> affIds, HadoopMapReducePlanTopology top) { // Priority node. UUID prioAffId = F.first(affIds); @@ -260,10 +260,8 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc for (HadoopMapReducePlanGroup grp : top.groups()) { MapperPriority prio = groupPriority(grp, affIds, prioAffId); - int weight = grp.weight() + - (prio == MapperPriority.NORMAL ? rmtMapperWeight : locMapperWeight); + int weight = grp.weight() + (prio == MapperPriority.NORMAL ? rmtMapperWeight : locMapperWeight); - // TODO: may be just consider HIGHEST prio to have load weight (locMapperWeight - 1)? if (resGrp == null || weight < resWeight || weight == resWeight && prio.value() > resPrio.value()) { resGrp = grp; resPrio = prio; @@ -286,65 +284,33 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc * @param grp Group. * @param priority Priority. * @param affIds Affinity IDs. - * @param priorityAffId Priority affinitiy IDs. + * @param priorityAffId Priority affinity IDs. * @return Best node ID in the group. */ private static UUID bestMapperNodeForGroup(HadoopMapReducePlanGroup grp, MapperPriority priority, - @Nullable Collection<UUID> affIds, @Nullable UUID priorityAffId) { - // Return the best node from the group. - int idx = 0; + @Nullable List<UUID> affIds, @Nullable UUID priorityAffId) { + if (grp.single()) + return grp.nodeId(0); // This is rare situation when several nodes are started on the same host. - if (!grp.single()) { - switch (priority) { - case NORMAL: { - // Pick any node. - idx = ThreadLocalRandom.current().nextInt(grp.nodeCount()); - - break; - } - case HIGH: { - // Pick any affinity node. - assert !F.isEmpty(affIds); + switch (priority) { + case NORMAL: + // Pick any node. + return grp.nodeId(ThreadLocalRandom.current().nextInt(grp.nodeCount())); - List<Integer> cands = new ArrayList<>(); + case HIGH: + // Pick any affinity node. + assert !F.isEmpty(affIds); - for (int i = 0; i < grp.nodeCount(); i++) { - UUID id = grp.nodeId(i); - - if (affIds.contains(id)) - cands.add(i); - } - - assert cands.size() >= 1; - - idx = cands.get(ThreadLocalRandom.current().nextInt(cands.size())); - - break; - } - default: { - // Find primary node. - assert priorityAffId != null; + return affIds.get(ThreadLocalRandom.current().nextInt(affIds.size())); - for (int i = 0; i < grp.nodeCount(); i++) { - UUID id = grp.nodeId(i); + default: + // Pick primary node. + assert priority == MapperPriority.HIGHEST; + assert priorityAffId != null; - if (F.eq(id, priorityAffId)) { - idx = i; - - break; - } - } - - // TODO: can we just return 'priorityAffId' there? - // TODO: is that really possible that prio == HIGHEST, but the id is not 'priorityAffId'? - //assert priorityAffId.equals(grp.nodeId(idx)); - assert priority == MapperPriority.HIGHEST; - } - } + return priorityAffId; } - - return grp.nodeId(idx); } /** @@ -618,13 +584,12 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc * Calculate group priority. * * @param grp Group. - * @param affIds Affintiy IDs. + * @param affIds Affinity IDs. * @param prioAffId Priority affinity ID. * @return Group priority. */ private static MapperPriority groupPriority(HadoopMapReducePlanGroup grp, @Nullable Collection<UUID> affIds, @Nullable UUID prioAffId) { - // prioAffId is actually the 1st element of affIds: assert F.isEmpty(affIds) ? prioAffId == null : prioAffId == F.first(affIds); assert grp != null; @@ -635,6 +600,8 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc for (int i = 0; i < grp.nodeCount(); i++) { UUID id = grp.nodeId(i); + // "affIds" is a list, so we have O(n) lookup time here. In practice most often we will have + // either 1 (IGFS) or 3 (HDFS) affinity nodes, what gives us 2 comparisons on average, not a big deal. if (affIds.contains(id)) { // If there is an affinity node among this machine nodes, the prio is HIGH: prio = MapperPriority.HIGH;
