Repository: tez Updated Branches: refs/heads/master 5ec498d8f -> 24ba80f81
TEZ-2935. Add MR slow start translation for ShuffleVertexManager (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/24ba80f8 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/24ba80f8 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/24ba80f8 Branch: refs/heads/master Commit: 24ba80f811a653867b456287d88bfaa349f21310 Parents: 5ec498d Author: Jonathan Eagles <[email protected]> Authored: Fri Nov 13 10:21:25 2015 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Fri Nov 13 10:21:25 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../tez/mapreduce/hadoop/DeprecatedKeys.java | 3 + .../mapreduce/hadoop/TestDeprecatedKeys.java | 4 ++ .../vertexmanager/ShuffleVertexManager.java | 9 ++- .../vertexmanager/TestShuffleVertexManager.java | 60 ++++++++++++++++++-- 5 files changed, 70 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/24ba80f8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 52c73da..7233b03 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-2679. Admin forms of launch env settings ALL CHANGES: + TEZ-2935. Add MR slow start translation for ShuffleVertexManager TEZ-2918. Make progress notifications in IOs TEZ-2940. Invalid shuffle max slow start setting causes vertex to hang indefinitely TEZ-2930. Tez UI: Parent controller is not polling at times @@ -249,6 +250,7 @@ INCOMPATIBLE CHANGES TEZ-2679. Admin forms of launch env settings ALL CHANGES + TEZ-2935. Add MR slow start translation for ShuffleVertexManager TEZ-2940. Invalid shuffle max slow start setting causes vertex to hang indefinitely TEZ-1670. Add tests for all converter functions in HistoryEventTimelineConversion. TEZ-2922. Tez Live UI gives access denied for admins http://git-wip-us.apache.org/repos/asf/tez/blob/24ba80f8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java index 8bd27cb..49f95c0 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.Constants; @@ -109,6 +110,8 @@ public class DeprecatedKeys { registerMRToRuntimeKeyTranslation(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, Constants.TEZ_RUNTIME_TASK_MEMORY); + registerMRToRuntimeKeyTranslation(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION); + registerMRToRuntimeKeyTranslation(MRJobConfig.SHUFFLE_PARALLEL_COPIES, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES); registerMRToRuntimeKeyTranslation(MRJobConfig.SHUFFLE_FETCH_FAILURES, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT); http://git-wip-us.apache.org/repos/asf/tez/blob/24ba80f8/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java index 3727233..2414743 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNull; import org.apache.hadoop.mapred.JobConf; +import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.Constants; import org.junit.Test; @@ -73,6 +74,7 @@ public class TestDeprecatedKeys { jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, 2000); jobConf.setInt(MRJobConfig.IO_SORT_MB, 100); jobConf.setInt(MRJobConfig.COUNTERS_MAX_KEY, 100); + jobConf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.95f); jobConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 1000); jobConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 200); @@ -127,6 +129,7 @@ public class TestDeprecatedKeys { assertEquals("SecondaryComparator", jobConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS, "")); assertEquals("DefaultSorter", jobConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, "")); assertTrue(jobConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false)); + assertEquals(0.95f, jobConf.getFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 0.0f), 0.0f); assertNull(jobConf.get(MRConfig.MAPRED_IFILE_READAHEAD)); assertNull(jobConf.get(MRConfig.MAPRED_IFILE_READAHEAD_BYTES)); @@ -151,6 +154,7 @@ public class TestDeprecatedKeys { assertNull(jobConf.get(MRJobConfig.GROUP_COMPARATOR_CLASS)); assertNull(jobConf.get(MRJobConfig.GROUP_COMPARATOR_CLASS)); assertNull(jobConf.get("map.sort.class")); + assertNull(jobConf.get(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART)); } } http://git-wip-us.apache.org/repos/asf/tez/blob/24ba80f8/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index c5016aa..5fb4df9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -99,7 +99,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin { * In case of a ScatterGather connection, once this fraction of source tasks * have completed, all tasks on the current vertex can be scheduled. Number of * tasks ready for scheduling on the current vertex scales linearly between - * min-fraction and max-fraction + * min-fraction and max-fraction. Defaults to the greater of the default value + * or tez.shuffle-vertex-manager.min-src-fraction. */ public static final String TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION = "tez.shuffle-vertex-manager.max-src-fraction"; @@ -944,10 +945,14 @@ public class ShuffleVertexManager extends VertexManagerPlugin { .getFloat( ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT); + float defaultSlowStartMaxSrcFraction = ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT; + if (slowStartMinSrcCompletionFraction > defaultSlowStartMaxSrcFraction) { + defaultSlowStartMaxSrcFraction = slowStartMinSrcCompletionFraction; + } this.slowStartMaxSrcCompletionFraction = conf .getFloat( ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, - ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT); + defaultSlowStartMaxSrcFraction); if (slowStartMinSrcCompletionFraction < 0 || slowStartMaxSrcCompletionFraction > 1 || slowStartMaxSrcCompletionFraction < slowStartMinSrcCompletionFraction) { http://git-wip-us.apache.org/repos/asf/tez/blob/24ba80f8/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index fec08b2..965e99c 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -583,7 +583,7 @@ public class TestShuffleVertexManager { try { // source vertex have some tasks. min < 0. - manager = createManager(conf, mockContext, -0.1f, 0); + manager = createManager(conf, mockContext, -0.1f, 0.0f); Assert.assertTrue(false); // should not come here } catch (IllegalArgumentException e) { Assert.assertTrue(e.getMessage().contains( @@ -607,9 +607,49 @@ public class TestShuffleVertexManager { Assert.assertTrue(e.getMessage().contains( "Invalid values for slowStartMinSrcCompletionFraction")); } - + + // source vertex have some tasks. min > default and max undefined + when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(20); + when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20); + scheduledTasks.clear(); + + manager = createManager(conf, mockContext, 0.8f, null); + manager.onVertexStarted(emptyCompletions); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); + Assert.assertEquals(3, manager.pendingTasks.size()); + Assert.assertEquals(40, manager.totalNumBipartiteSourceTasks); + Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); + + float completedTasksThreshold = 0.8f * manager.totalNumBipartiteSourceTasks; + int completedTasks = 0; + // Finish all tasks before exceeding the threshold + for (String mockSrcVertex : new String[] { mockSrcVertexId1, mockSrcVertexId2 }) { + for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) { + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i)); + ++completedTasks; + if ((completedTasks + 1) >= completedTasksThreshold) { + // stop before completing more than min/max source tasks + break; + } + } + } + // Since we haven't exceeded the threshold, all tasks are still pending + Assert.assertEquals(manager.totalTasksToSchedule, manager.pendingTasks.size()); + Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled + + // Cross the threshold min/max threshold to schedule all tasks + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, completedTasks)); + Assert.assertEquals(0, manager.pendingTasks.size()); + Assert.assertEquals(manager.totalTasksToSchedule, scheduledTasks.size()); // all tasks scheduled + + // reset vertices for next test + when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2); + when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2); + // source vertex have some tasks. min, max == 0 - manager = createManager(conf, mockContext, 0, 0); + manager = createManager(conf, mockContext, 0.0f, 0.0f); manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); Assert.assertTrue(manager.totalTasksToSchedule == 3); @@ -1184,9 +1224,17 @@ public class TestShuffleVertexManager { } private ShuffleVertexManager createManager(Configuration conf, - VertexManagerPluginContext context, float min, float max) { - conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, min); - conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, max); + VertexManagerPluginContext context, Float min, Float max) { + if (min != null) { + conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, min); + } else { + conf.unset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION); + } + if (max != null) { + conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, max); + } else { + conf.unset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION); + } UserPayload payload; try { payload = TezUtils.createUserPayloadFromConf(conf);
