FInalization.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/50d7d3af Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/50d7d3af Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/50d7d3af Branch: refs/heads/ignite-3414 Commit: 50d7d3af0335704aa13edfb05f82a98c1a6cad7f Parents: a1c7047 Author: vozerov-gridgain <[email protected]> Authored: Tue Jul 19 10:29:29 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Jul 19 10:29:29 2016 +0300 ---------------------------------------------------------------------- .../IgniteHadoopWeightedMapReducePlanner.java | 59 ++++---------------- 1 file changed, 11 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/50d7d3af/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java index 15684be..71d8da4 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java @@ -351,11 +351,8 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc */ private Map<UUID, int[]> assignReducers(Collection<HadoopInputSplit> splits, HadoopMapReducePlanTopology top, Mappers mappers, int reducerCnt) { - // 'reducers' is mapping from node id to number of reducers to be assigned to that node. Map<UUID, Integer> reducers = assignReducers0(top, splits, mappers, reducerCnt); - // Convert the result of previous invocation: number of reducers is converted to same length array filled with - // reducer order number: int cnt = 0; Map<UUID, int[]> res = new HashMap<>(reducers.size()); @@ -376,7 +373,6 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc /** * Generate reducers. - * Determines the number of reducers to be assigned to each node. * * @param top Topology. * @param splits Input splits. @@ -386,18 +382,14 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc */ private Map<UUID, Integer> assignReducers0(HadoopMapReducePlanTopology top, Collection<HadoopInputSplit> splits, Mappers mappers, int reducerCnt) { + Map<UUID, Integer> res = new HashMap<>(); + // Assign reducers to splits. - // The map contains hom many reducers should be assigned per split: Map<HadoopInputSplit, Integer> splitToReducerCnt = assignReducersToSplits(splits, reducerCnt); -// int sum = sum(splitToReducerCnt); -// assert reducerCnt == sum : "exp: " + reducerCnt + ", act: " + sum; - // Assign as much local reducers as possible. int remaining = 0; - Map<UUID, Integer> res = new HashMap<>(); - for (Map.Entry<HadoopInputSplit, Integer> entry : splitToReducerCnt.entrySet()) { HadoopInputSplit split = entry.getKey(); int cnt = entry.getValue(); @@ -415,9 +407,6 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc if (remaining > 0) assignRemoteReducers(remaining, top, mappers, res); -// sum = sum(res); -// assert reducerCnt == sum : "exp: " + reducerCnt + ", act: " + sum; - return res; } @@ -425,7 +414,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc * Assign local split reducers. * * @param split Split. - * @param cnt Reducer count: how many reducers should be assigned for this split. + * @param cnt Reducer count. * @param top Topology. * @param mappers Mappers. * @param resMap Reducers result map. @@ -453,41 +442,13 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc } // Update result map. - if (res > 0) - inc(resMap, nodeId, res); - - return res; - } - - /** - * TODO: Docs - * @param reducers - * @return summary count. - */ - public static int sum(Map<?, Integer> reducers) { - int sum = 0; - - for (Integer i: reducers.values()) - sum += i; - - return sum; - } + if (res > 0) { + Integer reducerCnt = resMap.get(nodeId); - /** - * TODO Docs. - * @param resMap - * @param key - * @param increment - * @param <K> - */ - public static <K> void inc(Map<K, Integer> resMap, K key, int increment) { - assert increment > 0; - - // Update result map. - Integer res = resMap.get(key); + resMap.put(nodeId, reducerCnt == null ? res : reducerCnt + res); + } - // 1 reducer was added for 'id': - resMap.put(key, res == null ? increment : res + increment); + return res; } /** @@ -550,7 +511,9 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc assert add; // Exactly 1 reducer was added for 'id': - inc(resMap, id, 1); + Integer reducerCnt = resMap.get(id); + + resMap.put(id, reducerCnt == null ? 1 : reducerCnt + 1); } }
