FInalization.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bb4b3660 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bb4b3660 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bb4b3660 Branch: refs/heads/ignite-3414 Commit: bb4b36603d7e79e3f473ae7fc3991565b8b9d978 Parents: 50d7d3a Author: vozerov-gridgain <[email protected]> Authored: Tue Jul 19 10:33:28 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Jul 19 10:33:28 2016 +0300 ---------------------------------------------------------------------- .../IgniteHadoopWeightedMapReducePlanner.java | 25 ++++++-------------- 1 file changed, 7 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bb4b3660/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java index 71d8da4..4d4f518 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java @@ -119,7 +119,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc Mappers res = new Mappers(); for (HadoopInputSplit split : splits) { - // Try getting affinity node IDs. + // Try getting IGFS affinity. Collection<UUID> nodeIds = affinityNodesForSplit(split, top); // Get best node. @@ -467,7 +467,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc set.addAll(top.groups()); while (cnt-- > 0) { - // The least loaded machine: + // The least loaded machine. HadoopMapReducePlanGroup grp = set.first(); // Look for affinity nodes. @@ -510,10 +510,10 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc assert add; - // Exactly 1 reducer was added for 'id': - Integer reducerCnt = resMap.get(id); + // Update result map. + Integer res = resMap.get(id); - resMap.put(id, reducerCnt == null ? 1 : reducerCnt + 1); + resMap.put(id, res == null ? 1 : res + 1); } } @@ -545,14 +545,9 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc int reducerCnt) { Map<HadoopInputSplit, Integer> res = new IdentityHashMap<>(splits.size()); - // reducers per 1 split: int base = reducerCnt / splits.size(); + int remainder = reducerCnt % splits.size(); - // TODO: may that be not equal to (reducerCnt % splits.size()) ? - int remainder = reducerCnt - base * splits.size(); - - // Not considering remainder, each split gets 'base' reducers. - // Distribute the remainder with round-robin strategy: for (HadoopInputSplit split : splits) { int val = base; @@ -586,18 +581,12 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc MapperPriority prio = MapperPriority.NORMAL; if (!F.isEmpty(affIds)) { - // Iterate over all the nodes on this machine: for (int i = 0; i < grp.nodeCount(); i++) { UUID id = grp.nodeId(i); - // "affIds" is a list, so we have O(n) lookup time here. In practice most often we will have - // either 1 (IGFS) or 3 (HDFS) affinity nodes, what gives us 2 comparisons on average, not a big deal. if (affIds.contains(id)) { - // If there is an affinity node among this machine nodes, the prio is HIGH: prio = MapperPriority.HIGH; - // If preferred node (node containing most of the data) found on this machine, - // the prio gets HIGHEST: if (F.eq(prioAffId, id)) { prio = MapperPriority.HIGHEST; @@ -756,7 +745,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc } /** - * Bi-Map from node id to a collection of splits. + * Mappers. */ private static class Mappers { /** Node-to-splits map. */
