FInalization.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ae2d9e17 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ae2d9e17 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ae2d9e17 Branch: refs/heads/ignite-3414 Commit: ae2d9e1738e00965b681673080a4a23b3c1876b7 Parents: 3defef3 Author: vozerov-gridgain <[email protected]> Authored: Tue Jul 19 10:22:37 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Jul 19 10:22:37 2016 +0300 ---------------------------------------------------------------------- .../IgniteHadoopWeightedMapReducePlanner.java | 61 ++++++++++++++------ 1 file changed, 44 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2d9e17/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java index 5a9d5b9..b4b1d56 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java @@ -284,33 +284,60 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc * @param grp Group. * @param priority Priority. * @param affIds Affinity IDs. - * @param priorityAffId Priority affinity IDs. + * @param prioAffId Priority affinity IDs. * @return Best node ID in the group. */ private static UUID bestMapperNodeForGroup(HadoopMapReducePlanGroup grp, MapperPriority priority, - @Nullable List<UUID> affIds, @Nullable UUID priorityAffId) { - if (grp.single()) - return grp.nodeId(0); + @Nullable Collection<UUID> affIds, @Nullable UUID prioAffId) { + // Return the best node from the group. + int idx = 0; // This is rare situation when several nodes are started on the same host. - switch (priority) { - case NORMAL: - // Pick any node. - return grp.nodeId(ThreadLocalRandom.current().nextInt(grp.nodeCount())); + if (!grp.single()) { + switch (priority) { + case NORMAL: { + // Pick any node. + idx = ThreadLocalRandom.current().nextInt(grp.nodeCount()); + + break; + } + case HIGH: { + // Pick any affinity node. + assert affIds != null; + + List<Integer> cands = new ArrayList<>(); - case HIGH: - // Pick any affinity node. - assert !F.isEmpty(affIds); + for (int i = 0; i < grp.nodeCount(); i++) { + UUID id = grp.nodeId(i); - return affIds.get(ThreadLocalRandom.current().nextInt(affIds.size())); + if (affIds.contains(id)) + cands.add(i); + } + + idx = cands.get(ThreadLocalRandom.current().nextInt(cands.size())); + + break; + } + default: { + // Find primary node. + assert prioAffId != null; - default: - // Pick primary node. - assert priority == MapperPriority.HIGHEST; - assert priorityAffId != null; + for (int i = 0; i < grp.nodeCount(); i++) { + UUID id = grp.nodeId(i); - return priorityAffId; + if (F.eq(id, prioAffId)) { + idx = i; + + break; + } + } + + assert priority == MapperPriority.HIGHEST; + } + } } + + return grp.nodeId(idx); } /**
