IGNITE-3414: Minors.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/685825f6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/685825f6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/685825f6 Branch: refs/heads/ignite-3414 Commit: 685825f64ec41deff5291242e69f4708b38f874c Parents: 644e9f6 Author: vozerov-gridgain <[email protected]> Authored: Tue Jul 19 11:10:19 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Jul 19 11:10:19 2016 +0300 ---------------------------------------------------------------------- .../IgniteHadoopWeightedMapReducePlanner.java | 38 ++++++++++---------- 1 file changed, 20 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/685825f6/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java index 4d4f518..6349418 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java @@ -54,7 +54,8 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; /** - * Map-reduce planner which tries to assign map jobs to affinity nodes. + * Map-reduce planner which assigns mappers and reducers based on their weights. + * <p> */ // TODO: Docs. public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReducePlanner { @@ -62,16 +63,16 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc public static final int DFLT_LOC_MAPPER_WEIGHT = 100; /** Default remote mapper weight. */ - public static final int DFLT_RMT_MAPPER_WEIGHT = 120; + public static final int DFLT_RMT_MAPPER_WEIGHT = 100; /** Default local reducer weight. */ public static final int DFLT_LOC_REDUCER_WEIGHT = 100; /** Default remote reducer weight. */ - public static final int DFLT_RMT_REDUCER_WEIGHT = 120; + public static final int DFLT_RMT_REDUCER_WEIGHT = 100; /** Default reducer migration threshold weight. */ - public static final int DFLT_REDUCER_MIGRATION_THRESHOLD_WEIGHT = 1000; + public static final int DFLT_REDUCER_MIGRATION_THRESHOLD_WEIGHT = 200; /** Local mapper weight. */ private int locMapperWeight = DFLT_LOC_MAPPER_WEIGHT; @@ -86,7 +87,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc private int rmtReducerWeight = DFLT_RMT_REDUCER_WEIGHT; /** Reducer migration threshold weight. */ - private int reducerMigrationThresholdWeight = DFLT_REDUCER_MIGRATION_THRESHOLD_WEIGHT; + private int preferLocReducerThresholdWeight = DFLT_REDUCER_MIGRATION_THRESHOLD_WEIGHT; /** {@inheritDoc} */ @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> nodes, @@ -435,7 +436,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc // Assign more reducers to the node until threshold is reached. int res = 0; - while (res < cnt && grp.weight() < reducerMigrationThresholdWeight) { + while (res < cnt && grp.weight() < preferLocReducerThresholdWeight) { res++; grp.weight(grp.weight() + locReducerWeight); @@ -470,17 +471,17 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc // The least loaded machine. HadoopMapReducePlanGroup grp = set.first(); - // Look for affinity nodes. - List<UUID> affIds = null; + // Look for nodes with assigned splits. + List<UUID> splitNodeIds = null; for (int i = 0; i < grp.nodeCount(); i++) { UUID nodeId = grp.nodeId(i); if (mappers.nodeToSplits.containsKey(nodeId)) { - if (affIds == null) - affIds = new ArrayList<>(2); + if (splitNodeIds == null) + splitNodeIds = new ArrayList<>(2); - affIds.add(nodeId); + splitNodeIds.add(nodeId); } } @@ -488,8 +489,8 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc UUID id; int newWeight; - if (affIds != null) { - id = affIds.get(ThreadLocalRandom.current().nextInt(affIds.size())); + if (splitNodeIds != null) { + id = splitNodeIds.get(ThreadLocalRandom.current().nextInt(splitNodeIds.size())); newWeight = grp.weight() + locReducerWeight; } @@ -681,17 +682,18 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc * @return Reducer migration threshold weight. */ // TODO: Docs. - public int getReducerMigrationThresholdWeight() { - return reducerMigrationThresholdWeight; + public int getPreferLocalReducerThresholdWeight() { + return preferLocReducerThresholdWeight; } /** - * Set reducer migration threshold weight. See {@link #getReducerMigrationThresholdWeight()} for more information. + * Set reducer migration threshold weight. See {@link #getPreferLocalReducerThresholdWeight()} for more + * information. * * @param reducerMigrationThresholdWeight Reducer migration threshold weight. */ - public void setReducerMigrationThresholdWeight(int reducerMigrationThresholdWeight) { - this.reducerMigrationThresholdWeight = reducerMigrationThresholdWeight; + public void setPreferLocalReducerThresholdWeight(int reducerMigrationThresholdWeight) { + this.preferLocReducerThresholdWeight = reducerMigrationThresholdWeight; } /** {@inheritDoc} */
