Repository: tez Updated Branches: refs/heads/branch-0.7 7121c3aed -> ca7f08026
TEZ-2935. Add MR slow start translation for ShuffleVertexManager (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ca7f0802 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ca7f0802 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ca7f0802 Branch: refs/heads/branch-0.7 Commit: ca7f08026401dea711bdafa9535a9c368d403616 Parents: 7121c3a Author: Jonathan Eagles <[email protected]> Authored: Fri Nov 13 10:21:25 2015 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Fri Nov 13 10:41:32 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/mapreduce/hadoop/DeprecatedKeys.java | 3 + .../mapreduce/hadoop/TestDeprecatedKeys.java | 4 ++ .../vertexmanager/ShuffleVertexManager.java | 9 ++- .../vertexmanager/TestShuffleVertexManager.java | 60 ++++++++++++++++++-- 5 files changed, 69 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/ca7f0802/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6cef49c..2b9572e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-2679. Admin forms of launch env settings ALL CHANGES + TEZ-2935. Add MR slow start translation for ShuffleVertexManager TEZ-2940. Invalid shuffle max slow start setting causes vertex to hang indefinitely TEZ-2930. Tez UI: Parent controller is not polling at times TEZ-1670. Add tests for all converter functions in HistoryEventTimelineConversion. http://git-wip-us.apache.org/repos/asf/tez/blob/ca7f0802/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java index 8bd27cb..49f95c0 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.Constants; @@ -109,6 +110,8 @@ public class DeprecatedKeys { registerMRToRuntimeKeyTranslation(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, Constants.TEZ_RUNTIME_TASK_MEMORY); + registerMRToRuntimeKeyTranslation(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION); + registerMRToRuntimeKeyTranslation(MRJobConfig.SHUFFLE_PARALLEL_COPIES, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES); registerMRToRuntimeKeyTranslation(MRJobConfig.SHUFFLE_FETCH_FAILURES, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT); http://git-wip-us.apache.org/repos/asf/tez/blob/ca7f0802/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java index 3727233..2414743 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNull; import org.apache.hadoop.mapred.JobConf; +import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.Constants; import org.junit.Test; @@ -73,6 +74,7 @@ public class TestDeprecatedKeys { jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, 2000); jobConf.setInt(MRJobConfig.IO_SORT_MB, 100); jobConf.setInt(MRJobConfig.COUNTERS_MAX_KEY, 100); + jobConf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.95f); jobConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 1000); jobConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 200); @@ -127,6 +129,7 @@ public class TestDeprecatedKeys { assertEquals("SecondaryComparator", jobConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS, "")); assertEquals("DefaultSorter", jobConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, "")); assertTrue(jobConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false)); + assertEquals(0.95f, jobConf.getFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 0.0f), 0.0f); assertNull(jobConf.get(MRConfig.MAPRED_IFILE_READAHEAD)); assertNull(jobConf.get(MRConfig.MAPRED_IFILE_READAHEAD_BYTES)); @@ -151,6 +154,7 @@ public class TestDeprecatedKeys { assertNull(jobConf.get(MRJobConfig.GROUP_COMPARATOR_CLASS)); assertNull(jobConf.get(MRJobConfig.GROUP_COMPARATOR_CLASS)); assertNull(jobConf.get("map.sort.class")); + assertNull(jobConf.get(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART)); } } http://git-wip-us.apache.org/repos/asf/tez/blob/ca7f0802/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index 4dfd411..fb47d47 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -87,7 +87,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin { * In case of a ScatterGather connection, once this fraction of source tasks * have completed, all tasks on the current vertex can be scheduled. Number of * tasks ready for scheduling on the current vertex scales linearly between - * min-fraction and max-fraction + * min-fraction and max-fraction. Defaults to the greater of the default value + * or tez.shuffle-vertex-manager.min-src-fraction. */ public static final String TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION = "tez.shuffle-vertex-manager.max-src-fraction"; @@ -785,10 +786,14 @@ public class ShuffleVertexManager extends VertexManagerPlugin { .getFloat( ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT); + float defaultSlowStartMaxSrcFraction = ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT; + if (slowStartMinSrcCompletionFraction > defaultSlowStartMaxSrcFraction) { + defaultSlowStartMaxSrcFraction = slowStartMinSrcCompletionFraction; + } this.slowStartMaxSrcCompletionFraction = conf .getFloat( ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, - ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT); + defaultSlowStartMaxSrcFraction); if (slowStartMinSrcCompletionFraction < 0 || slowStartMaxSrcCompletionFraction > 1 || slowStartMaxSrcCompletionFraction < slowStartMinSrcCompletionFraction) { http://git-wip-us.apache.org/repos/asf/tez/blob/ca7f0802/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index 4f93049..d71eba2 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -529,7 +529,7 @@ public class TestShuffleVertexManager { try { // source vertex have some tasks. min < 0. - manager = createManager(conf, mockContext, -0.1f, 0); + manager = createManager(conf, mockContext, -0.1f, 0.0f); Assert.assertTrue(false); // should not come here } catch (IllegalArgumentException e) { Assert.assertTrue(e.getMessage().contains( @@ -553,9 +553,49 @@ public class TestShuffleVertexManager { Assert.assertTrue(e.getMessage().contains( "Invalid values for slowStartMinSrcCompletionFraction")); } - + + // source vertex have some tasks. min > default and max undefined + when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(20); + when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20); + scheduledTasks.clear(); + + manager = createManager(conf, mockContext, 0.8f, null); + manager.onVertexStarted(null); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); + Assert.assertEquals(3, manager.pendingTasks.size()); + Assert.assertEquals(40, manager.totalNumBipartiteSourceTasks); + Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); + + float completedTasksThreshold = 0.8f * manager.totalNumBipartiteSourceTasks; + int completedTasks = 0; + // Finish all tasks before exceeding the threshold + for (String mockSrcVertex : new String[] { mockSrcVertexId1, mockSrcVertexId2 }) { + for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) { + manager.onSourceTaskCompleted(mockSrcVertex, i); + ++completedTasks; + if ((completedTasks + 1) >= completedTasksThreshold) { + // stop before completing more than min/max source tasks + break; + } + } + } + // Since we haven't exceeded the threshold, all tasks are still pending + Assert.assertEquals(manager.totalTasksToSchedule, manager.pendingTasks.size()); + Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled + + // Cross the threshold min/max threshold to schedule all tasks + manager.onSourceTaskCompleted(mockSrcVertexId2, completedTasks); + Assert.assertEquals(0, manager.pendingTasks.size()); + Assert.assertEquals(manager.totalTasksToSchedule, scheduledTasks.size()); // all tasks scheduled + + // reset vertices for next test + when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2); + when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2); + // source vertex have some tasks. min, max == 0 - manager = createManager(conf, mockContext, 0, 0); + manager = createManager(conf, mockContext, 0.0f, 0.0f); manager.onVertexStarted(null); Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); Assert.assertTrue(manager.totalTasksToSchedule == 3); @@ -985,9 +1025,17 @@ public class TestShuffleVertexManager { } private ShuffleVertexManager createManager(Configuration conf, - VertexManagerPluginContext context, float min, float max) { - conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, min); - conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, max); + VertexManagerPluginContext context, Float min, Float max) { + if (min != null) { + conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, min); + } else { + conf.unset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION); + } + if (max != null) { + conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, max); + } else { + conf.unset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION); + } UserPayload payload; try { payload = TezUtils.createUserPayloadFromConf(conf);
