FInalization.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fc58697b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fc58697b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fc58697b

Branch: refs/heads/ignite-3414
Commit: fc58697b5d28d9167b9069890bf5e36e246e6f9b
Parents: 65d76a4
Author: vozerov-gridgain <[email protected]>
Authored: Tue Jul 19 10:04:30 2016 +0300
Committer: vozerov-gridgain <[email protected]>
Committed: Tue Jul 19 10:04:30 2016 +0300

----------------------------------------------------------------------
 .../IgniteHadoopWeightedMapReducePlanner.java   | 39 ++++++++++----------
 1 file changed, 20 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fc58697b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
index 8f2a86c..c184e46 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
@@ -119,11 +119,10 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
         Mappers res = new Mappers();
 
         for (HadoopInputSplit split : splits) {
-            // Try getting IGFS affinity. Note that an empty collection may be 
returned
-            // if this is an HDFS split that does not belong to any Ignite 
node host.
+            // Try getting affinity node IDs.
             Collection<UUID> nodeIds = affinityNodesForSplit(split, top);
 
-            // Get best node for this split:
+            // Get best node.
             UUID node = bestMapperNode(nodeIds, top);
 
             assert node != null;
@@ -136,8 +135,8 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
 
     /**
      * Get affinity nodes for the given input split.
-     *
-     * <p/>In general, order in the returned collection *is* significant, 
meaning that nodes containing more data
+     * <p>
+     * Order in the returned collection *is* significant, meaning that nodes 
containing more data
      * go first. This way, the 1st nodes in the collection considered to be 
preferable for scheduling.
      *
      * @param split Split.
@@ -147,31 +146,34 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
      */
     private Collection<UUID> affinityNodesForSplit(HadoopInputSplit split, 
HadoopMapReducePlanTopology top)
         throws IgniteCheckedException {
-        // IGFS part:
         Collection<UUID> nodeIds = igfsAffinityNodesForSplit(split);
 
-        // HDFS part:
-        if (nodeIds == null) {
-            nodeIds = new HashSet<>();
+        if (nodeIds != null)
+            return nodeIds;
+
+        Map<NodeIdAndLength, UUID> res = new TreeMap<>();
+
+        for (String host : split.hosts()) {
+            long len = split instanceof HadoopFileBlock ? 
((HadoopFileBlock)split).length() : 0L;
 
-            // TODO: also sort hosts there in non-ascending data order, as we 
do in case of IGFS affinity splits.
-            for (String host : split.hosts()) {
-                HadoopMapReducePlanGroup grp = top.groupForHost(host);
+            HadoopMapReducePlanGroup grp = top.groupForHost(host);
 
-                if (grp != null) {
-                    for (int i = 0; i < grp.nodeCount(); i++)
-                        nodeIds.add(grp.nodeId(i));
+            if (grp != null) {
+                for (int i = 0; i < grp.nodeCount(); i++) {
+                    UUID nodeId = grp.nodeId(i);
+
+                    res.put(new NodeIdAndLength(nodeId, len), nodeId);
                 }
             }
         }
 
-        return nodeIds;
+        return new LinkedHashSet<>(res.values());
     }
 
     /**
      * Get IGFS affinity nodes for split if possible.
-     *
-     * <p/>Order in the returned collection *is* significant, meaning that 
nodes containing more data
+     * <p>
+     * Order in the returned collection *is* significant, meaning that nodes 
containing more data
      * go first. This way, the 1st nodes in the collection considered to be 
preferable for scheduling.
      *
      * @param split Input split.
@@ -221,7 +223,6 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
                             }
 
                             // Sort the nodes in non-ascending order by 
contained data lengths.
-                            // NodeIdAndLength objects are used as keys to 
handle comparison when the lengths are equal.
                             Map<NodeIdAndLength, UUID> res = new TreeMap<>();
 
                             for (Map.Entry<UUID, Long> idToLenEntry : 
idToLen.entrySet()) {

Reply via email to