IGNITE-3414: Minors.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/685825f6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/685825f6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/685825f6

Branch: refs/heads/ignite-3414
Commit: 685825f64ec41deff5291242e69f4708b38f874c
Parents: 644e9f6
Author: vozerov-gridgain <[email protected]>
Authored: Tue Jul 19 11:10:19 2016 +0300
Committer: vozerov-gridgain <[email protected]>
Committed: Tue Jul 19 11:10:19 2016 +0300

----------------------------------------------------------------------
 .../IgniteHadoopWeightedMapReducePlanner.java   | 38 ++++++++++----------
 1 file changed, 20 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/685825f6/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
index 4d4f518..6349418 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
@@ -54,7 +54,8 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
 /**
- * Map-reduce planner which tries to assign map jobs to affinity nodes.
+ * Map-reduce planner which assigns mappers and reducers based on their 
weights.
+ * <p>
  */
 // TODO: Docs.
 public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReducePlanner {
@@ -62,16 +63,16 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
     public static final int DFLT_LOC_MAPPER_WEIGHT = 100;
 
     /** Default remote mapper weight. */
-    public static final int DFLT_RMT_MAPPER_WEIGHT = 120;
+    public static final int DFLT_RMT_MAPPER_WEIGHT = 100;
 
     /** Default local reducer weight. */
     public static final int DFLT_LOC_REDUCER_WEIGHT = 100;
 
     /** Default remote reducer weight. */
-    public static final int DFLT_RMT_REDUCER_WEIGHT = 120;
+    public static final int DFLT_RMT_REDUCER_WEIGHT = 100;
 
     /** Default reducer migration threshold weight. */
-    public static final int DFLT_REDUCER_MIGRATION_THRESHOLD_WEIGHT = 1000;
+    public static final int DFLT_REDUCER_MIGRATION_THRESHOLD_WEIGHT = 200;
 
     /** Local mapper weight. */
     private int locMapperWeight = DFLT_LOC_MAPPER_WEIGHT;
@@ -86,7 +87,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
     private int rmtReducerWeight = DFLT_RMT_REDUCER_WEIGHT;
 
     /** Reducer migration threshold weight. */
-    private int reducerMigrationThresholdWeight = 
DFLT_REDUCER_MIGRATION_THRESHOLD_WEIGHT;
+    private int preferLocReducerThresholdWeight = 
DFLT_REDUCER_MIGRATION_THRESHOLD_WEIGHT;
 
     /** {@inheritDoc} */
     @Override public HadoopMapReducePlan preparePlan(HadoopJob job, 
Collection<ClusterNode> nodes,
@@ -435,7 +436,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
         // Assign more reducers to the node until threshold is reached.
         int res = 0;
 
-        while (res < cnt && grp.weight() < reducerMigrationThresholdWeight) {
+        while (res < cnt && grp.weight() < preferLocReducerThresholdWeight) {
             res++;
 
             grp.weight(grp.weight() + locReducerWeight);
@@ -470,17 +471,17 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
             // The least loaded machine.
             HadoopMapReducePlanGroup grp = set.first();
 
-            // Look for affinity nodes.
-            List<UUID> affIds = null;
+            // Look for nodes with assigned splits.
+            List<UUID> splitNodeIds = null;
 
             for (int i = 0; i < grp.nodeCount(); i++) {
                 UUID nodeId = grp.nodeId(i);
 
                 if (mappers.nodeToSplits.containsKey(nodeId)) {
-                    if (affIds == null)
-                        affIds = new ArrayList<>(2);
+                    if (splitNodeIds == null)
+                        splitNodeIds = new ArrayList<>(2);
 
-                    affIds.add(nodeId);
+                    splitNodeIds.add(nodeId);
                 }
             }
 
@@ -488,8 +489,8 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
             UUID id;
             int newWeight;
 
-            if (affIds != null) {
-                id = 
affIds.get(ThreadLocalRandom.current().nextInt(affIds.size()));
+            if (splitNodeIds != null) {
+                id = 
splitNodeIds.get(ThreadLocalRandom.current().nextInt(splitNodeIds.size()));
 
                 newWeight = grp.weight() + locReducerWeight;
             }
@@ -681,17 +682,18 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
      * @return Reducer migration threshold weight.
      */
     // TODO: Docs.
-    public int getReducerMigrationThresholdWeight() {
-        return reducerMigrationThresholdWeight;
+    public int getPreferLocalReducerThresholdWeight() {
+        return preferLocReducerThresholdWeight;
     }
 
     /**
-     * Set reducer migration threshold weight. See {@link 
#getReducerMigrationThresholdWeight()} for more information.
+     * Set reducer migration threshold weight. See {@link 
#getPreferLocalReducerThresholdWeight()} for more
+     * information.
      *
      * @param reducerMigrationThresholdWeight Reducer migration threshold 
weight.
      */
-    public void setReducerMigrationThresholdWeight(int 
reducerMigrationThresholdWeight) {
-        this.reducerMigrationThresholdWeight = reducerMigrationThresholdWeight;
+    public void setPreferLocalReducerThresholdWeight(int 
reducerMigrationThresholdWeight) {
+        this.preferLocReducerThresholdWeight = reducerMigrationThresholdWeight;
     }
 
     /** {@inheritDoc} */

Reply via email to