Added docs.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/89aa2727 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/89aa2727 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/89aa2727 Branch: refs/heads/ignite-3414 Commit: 89aa272727926345df7fc9c7f0d1b961a377f046 Parents: 685825f Author: vozerov-gridgain <[email protected]> Authored: Tue Jul 19 12:42:43 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Jul 19 12:42:43 2016 +0300 ---------------------------------------------------------------------- .../IgniteHadoopWeightedMapReducePlanner.java | 62 +++++++++++++++----- 1 file changed, 48 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/89aa2727/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java index 6349418..27ffc19 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java @@ -54,10 +54,34 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; /** - * Map-reduce planner which assigns mappers and reducers based on their weights. + * Map-reduce planner which assigns mappers and reducers based on their "weights". Weight describes how much resources + * are required to execute particular map or reduce task. * <p> + * Plan creation consists of two steps: assigning mappers and assigning reducers. + * <p> + * Mappers are assigned based on input split data location. For each input split we search for nodes where + * its data is stored. Planner tries to assign mappers to their affinity nodes first. This process is governed by two + * properties: + * <ul> + * <li><b>{@code localMapperWeight}</b> - weight of a map task when it is executed on an affinity node;</li> + * <li><b>{@code remoteMapperWeight}</b> - weight of a map task when it is executed on a non-affinity node.</li> + * </ul> + * Planning algorithm assign mappers so that total resulting weight on all nodes is minimum possible. + * <p> + * Reducers are assigned differently. First we try to distribute reducers across nodes with mappers. This approach + * could minimize expensive data transfer over network. Reducer assigned to a node with mapper is considered + * <b>{@code local}</b>. Otherwise it is considered <b>{@code remote}</b>. This process continue until certain weight + * threshold is reached what means that current node is already too busy and it should not have higher priority over + * other nodes any more. Threshold can be configured using <b>{@code preferLocalReducerThresholdWeight}</b> property. + * <p> + * When local reducer threshold is reached on all nodes, we distribute remaining reducers based on their local and + * remote weights in the same way as it is done for mappers. This process is governed by two + * properties: + * <ul> + * <li><b>{@code localReducerWeight}</b> - weight of a reduce task when it is executed on a node with mappers;</li> + * <li><b>{@code remoteReducerWeight}</b> - weight of a map task when it is executed on a node without mappers.</li> + * </ul> */ -// TODO: Docs. public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReducePlanner { /** Default local mapper weight. */ public static final int DFLT_LOC_MAPPER_WEIGHT = 100; @@ -72,7 +96,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc public static final int DFLT_RMT_REDUCER_WEIGHT = 100; /** Default reducer migration threshold weight. */ - public static final int DFLT_REDUCER_MIGRATION_THRESHOLD_WEIGHT = 200; + public static final int DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT = 200; /** Local mapper weight. */ private int locMapperWeight = DFLT_LOC_MAPPER_WEIGHT; @@ -87,7 +111,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc private int rmtReducerWeight = DFLT_RMT_REDUCER_WEIGHT; /** Reducer migration threshold weight. */ - private int preferLocReducerThresholdWeight = DFLT_REDUCER_MIGRATION_THRESHOLD_WEIGHT; + private int preferLocReducerThresholdWeight = DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT; /** {@inheritDoc} */ @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> nodes, @@ -601,11 +625,13 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc } /** - * Get local mapper weight. + * Get local mapper weight. This weight is added to a node when a mapper is assigned and it's input split data is + * located on this node (at least partially). + * <p> + * Defaults to {@link #DFLT_LOC_MAPPER_WEIGHT}. * * @return Remote mapper weight. */ - // TODO: Docs. public int getLocalMapperWeight() { return locMapperWeight; } @@ -620,11 +646,13 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc } /** - * Get remote mapper weight. + * Get remote mapper weight. This weight is added to a node when a mapper is assigned, but it's input + * split data is not located on this node. + * <p> + * Defaults to {@link #DFLT_RMT_MAPPER_WEIGHT}. * * @return Remote mapper weight. */ - // TODO: Docs. public int getRemoteMapperWeight() { return rmtMapperWeight; } @@ -639,11 +667,13 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc } /** - * Get local reducer weight. + * Get local reducer weight. This weight is added to a node when a reducer is assigned and the node have at least + * one assigned mapper. + * <p> + * Defaults to {@link #DFLT_LOC_REDUCER_WEIGHT}. * * @return Local reducer weight. */ - // TODO: Docs. public int getLocalReducerWeight() { return locReducerWeight; } @@ -658,11 +688,13 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc } /** - * Get remote reducer weight. + * Get remote reducer weight. This weight is added to a node when a reducer is assigned, but the node doesn't have + * any assigned mappers. + * <p> + * Defaults to {@link #DFLT_RMT_REDUCER_WEIGHT}. * * @return Remote reducer weight. */ - // TODO: Docs. public int getRemoteReducerWeight() { return rmtReducerWeight; } @@ -677,11 +709,13 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc } /** - * Get reducer migration threshold weight. + * Get reducer migration threshold weight. When threshold is reached, a node with mappers is no longer considered + * as preferred for further reducer assignments. + * <p> + * Defaults to {@link #DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT}. * * @return Reducer migration threshold weight. */ - // TODO: Docs. public int getPreferLocalReducerThresholdWeight() { return preferLocReducerThresholdWeight; }
