FInalization.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fc58697b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fc58697b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fc58697b Branch: refs/heads/ignite-3414 Commit: fc58697b5d28d9167b9069890bf5e36e246e6f9b Parents: 65d76a4 Author: vozerov-gridgain <[email protected]> Authored: Tue Jul 19 10:04:30 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Jul 19 10:04:30 2016 +0300 ---------------------------------------------------------------------- .../IgniteHadoopWeightedMapReducePlanner.java | 39 ++++++++++---------- 1 file changed, 20 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fc58697b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java index 8f2a86c..c184e46 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java @@ -119,11 +119,10 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc Mappers res = new Mappers(); for (HadoopInputSplit split : splits) { - // Try getting IGFS affinity. Note that an empty collection may be returned - // if this is an HDFS split that does not belong to any Ignite node host. + // Try getting affinity node IDs. Collection<UUID> nodeIds = affinityNodesForSplit(split, top); - // Get best node for this split: + // Get best node. UUID node = bestMapperNode(nodeIds, top); assert node != null; @@ -136,8 +135,8 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc /** * Get affinity nodes for the given input split. - * - * <p/>In general, order in the returned collection *is* significant, meaning that nodes containing more data + * <p> + * Order in the returned collection *is* significant, meaning that nodes containing more data * go first. This way, the 1st nodes in the collection considered to be preferable for scheduling. * * @param split Split. @@ -147,31 +146,34 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc */ private Collection<UUID> affinityNodesForSplit(HadoopInputSplit split, HadoopMapReducePlanTopology top) throws IgniteCheckedException { - // IGFS part: Collection<UUID> nodeIds = igfsAffinityNodesForSplit(split); - // HDFS part: - if (nodeIds == null) { - nodeIds = new HashSet<>(); + if (nodeIds != null) + return nodeIds; + + Map<NodeIdAndLength, UUID> res = new TreeMap<>(); + + for (String host : split.hosts()) { + long len = split instanceof HadoopFileBlock ? ((HadoopFileBlock)split).length() : 0L; - // TODO: also sort hosts there in non-ascending data order, as we do in case of IGFS affinity splits. - for (String host : split.hosts()) { - HadoopMapReducePlanGroup grp = top.groupForHost(host); + HadoopMapReducePlanGroup grp = top.groupForHost(host); - if (grp != null) { - for (int i = 0; i < grp.nodeCount(); i++) - nodeIds.add(grp.nodeId(i)); + if (grp != null) { + for (int i = 0; i < grp.nodeCount(); i++) { + UUID nodeId = grp.nodeId(i); + + res.put(new NodeIdAndLength(nodeId, len), nodeId); } } } - return nodeIds; + return new LinkedHashSet<>(res.values()); } /** * Get IGFS affinity nodes for split if possible. - * - * <p/>Order in the returned collection *is* significant, meaning that nodes containing more data + * <p> + * Order in the returned collection *is* significant, meaning that nodes containing more data * go first. This way, the 1st nodes in the collection considered to be preferable for scheduling. * * @param split Input split. @@ -221,7 +223,6 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc } // Sort the nodes in non-ascending order by contained data lengths. - // NodeIdAndLength objects are used as keys to handle comparison when the lengths are equal. Map<NodeIdAndLength, UUID> res = new TreeMap<>(); for (Map.Entry<UUID, Long> idToLenEntry : idToLen.entrySet()) {
