Repository: tez
Updated Branches:
  refs/heads/branch-0.7 7121c3aed -> ca7f08026


TEZ-2935. Add MR slow start translation for ShuffleVertexManager (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ca7f0802
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ca7f0802
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ca7f0802

Branch: refs/heads/branch-0.7
Commit: ca7f08026401dea711bdafa9535a9c368d403616
Parents: 7121c3a
Author: Jonathan Eagles <[email protected]>
Authored: Fri Nov 13 10:21:25 2015 -0600
Committer: Jonathan Eagles <[email protected]>
Committed: Fri Nov 13 10:41:32 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/mapreduce/hadoop/DeprecatedKeys.java    |  3 +
 .../mapreduce/hadoop/TestDeprecatedKeys.java    |  4 ++
 .../vertexmanager/ShuffleVertexManager.java     |  9 ++-
 .../vertexmanager/TestShuffleVertexManager.java | 60 ++++++++++++++++++--
 5 files changed, 69 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ca7f0802/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6cef49c..2b9572e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-2679. Admin forms of launch env settings
 
 ALL CHANGES
+  TEZ-2935. Add MR slow start translation for ShuffleVertexManager
   TEZ-2940. Invalid shuffle max slow start setting causes vertex to hang 
indefinitely
   TEZ-2930. Tez UI: Parent controller is not polling at times
   TEZ-1670. Add tests for all converter functions in 
HistoryEventTimelineConversion.

http://git-wip-us.apache.org/repos/asf/tez/blob/ca7f0802/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
 
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
index 8bd27cb..49f95c0 100644
--- 
a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
+++ 
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.Constants;
 
@@ -109,6 +110,8 @@ public class DeprecatedKeys {
     
     registerMRToRuntimeKeyTranslation(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, 
Constants.TEZ_RUNTIME_TASK_MEMORY);
     
+    
registerMRToRuntimeKeyTranslation(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
 ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION);
+
     registerMRToRuntimeKeyTranslation(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
     
     registerMRToRuntimeKeyTranslation(MRJobConfig.SHUFFLE_FETCH_FAILURES, 
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT);

http://git-wip-us.apache.org/repos/asf/tez/blob/ca7f0802/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
 
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
index 3727233..2414743 100644
--- 
a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
+++ 
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNull;
 
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.Constants;
 import org.junit.Test;
@@ -73,6 +74,7 @@ public class TestDeprecatedKeys {
     jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, 2000);
     jobConf.setInt(MRJobConfig.IO_SORT_MB, 100);
     jobConf.setInt(MRJobConfig.COUNTERS_MAX_KEY, 100);
+    jobConf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.95f);
     
     jobConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 1000);
     jobConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 200);
@@ -127,6 +129,7 @@ public class TestDeprecatedKeys {
     assertEquals("SecondaryComparator", 
jobConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS, 
""));
     assertEquals("DefaultSorter", 
jobConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, ""));
     
assertTrue(jobConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, 
false));
+    assertEquals(0.95f, 
jobConf.getFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
 0.0f), 0.0f);
 
     assertNull(jobConf.get(MRConfig.MAPRED_IFILE_READAHEAD));
     assertNull(jobConf.get(MRConfig.MAPRED_IFILE_READAHEAD_BYTES));
@@ -151,6 +154,7 @@ public class TestDeprecatedKeys {
     assertNull(jobConf.get(MRJobConfig.GROUP_COMPARATOR_CLASS));
     assertNull(jobConf.get(MRJobConfig.GROUP_COMPARATOR_CLASS));
     assertNull(jobConf.get("map.sort.class"));
+    assertNull(jobConf.get(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ca7f0802/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 4dfd411..fb47d47 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -87,7 +87,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin 
{
    * In case of a ScatterGather connection, once this fraction of source tasks
    * have completed, all tasks on the current vertex can be scheduled. Number 
of
    * tasks ready for scheduling on the current vertex scales linearly between
-   * min-fraction and max-fraction
+   * min-fraction and max-fraction. Defaults to the greater of the default 
value
+   * or tez.shuffle-vertex-manager.min-src-fraction.
    */
   public static final String TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION = 
                                       
"tez.shuffle-vertex-manager.max-src-fraction";
@@ -785,10 +786,14 @@ public class ShuffleVertexManager extends 
VertexManagerPlugin {
         .getFloat(
             ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
             
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT);
+    float defaultSlowStartMaxSrcFraction = 
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT;
+    if (slowStartMinSrcCompletionFraction > defaultSlowStartMaxSrcFraction) {
+      defaultSlowStartMaxSrcFraction = slowStartMinSrcCompletionFraction;
+    }
     this.slowStartMaxSrcCompletionFraction = conf
         .getFloat(
             ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
-            
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT);
+            defaultSlowStartMaxSrcFraction);
 
     if (slowStartMinSrcCompletionFraction < 0 || 
slowStartMaxSrcCompletionFraction > 1
         || slowStartMaxSrcCompletionFraction < 
slowStartMinSrcCompletionFraction) {

http://git-wip-us.apache.org/repos/asf/tez/blob/ca7f0802/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 4f93049..d71eba2 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -529,7 +529,7 @@ public class TestShuffleVertexManager {
 
     try {
       // source vertex have some tasks. min < 0.
-      manager = createManager(conf, mockContext, -0.1f, 0);
+      manager = createManager(conf, mockContext, -0.1f, 0.0f);
       Assert.assertTrue(false); // should not come here
     } catch (IllegalArgumentException e) {
       Assert.assertTrue(e.getMessage().contains(
@@ -553,9 +553,49 @@ public class TestShuffleVertexManager {
       Assert.assertTrue(e.getMessage().contains(
           "Invalid values for slowStartMinSrcCompletionFraction"));
     }
-    
+
+    // source vertex have some tasks. min > default and max undefined
+    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(20);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20);
+    scheduledTasks.clear();
+
+    manager = createManager(conf, mockContext, 0.8f, null);
+    manager.onVertexStarted(null);
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, 
VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, 
VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, 
VertexState.CONFIGURED));
+    Assert.assertEquals(3, manager.pendingTasks.size());
+    Assert.assertEquals(40, manager.totalNumBipartiteSourceTasks);
+    Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+
+    float completedTasksThreshold = 0.8f * 
manager.totalNumBipartiteSourceTasks;
+    int completedTasks = 0;
+    // Finish all tasks before exceeding the threshold
+    for (String mockSrcVertex : new String[] { mockSrcVertexId1, 
mockSrcVertexId2 }) {
+      for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) {
+        manager.onSourceTaskCompleted(mockSrcVertex, i);
+        ++completedTasks;
+        if ((completedTasks + 1) >= completedTasksThreshold) {
+          // stop before completing more than min/max source tasks
+          break;
+        }
+      }
+    }
+    // Since we haven't exceeded the threshold, all tasks are still pending
+    Assert.assertEquals(manager.totalTasksToSchedule, 
manager.pendingTasks.size());
+    Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
+
+    // Cross the threshold min/max threshold to schedule all tasks
+    manager.onSourceTaskCompleted(mockSrcVertexId2, completedTasks);
+    Assert.assertEquals(0, manager.pendingTasks.size());
+    Assert.assertEquals(manager.totalTasksToSchedule, scheduledTasks.size()); 
// all tasks scheduled
+
+    // reset vertices for next test
+    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
+
     // source vertex have some tasks. min, max == 0
-    manager = createManager(conf, mockContext, 0, 0);
+    manager = createManager(conf, mockContext, 0.0f, 0.0f);
     manager.onVertexStarted(null);
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     Assert.assertTrue(manager.totalTasksToSchedule == 3);
@@ -985,9 +1025,17 @@ public class TestShuffleVertexManager {
   }
 
   private ShuffleVertexManager createManager(Configuration conf,
-      VertexManagerPluginContext context, float min, float max) {
-    
conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 
min);
-    
conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, 
max);
+      VertexManagerPluginContext context, Float min, Float max) {
+    if (min != null) {
+      
conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 
min);
+    } else {
+      
conf.unset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION);
+    }
+    if (max != null) {
+      
conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, 
max);
+    } else {
+      
conf.unset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION);
+    }
     UserPayload payload;
     try {
       payload = TezUtils.createUserPayloadFromConf(conf);

Reply via email to