Repository: tez Updated Branches: refs/heads/master dd9c517e3 -> 63177255c
TEZ-3716. Allow attempt retries to be treated the same as the first attempt. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/63177255 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/63177255 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/63177255 Branch: refs/heads/master Commit: 63177255c7c9f181b8fdbde63b896ddbffae6e07 Parents: dd9c517 Author: Siddharth Seth <[email protected]> Authored: Sun May 14 10:27:01 2017 -0700 Committer: Siddharth Seth <[email protected]> Committed: Sun May 14 10:27:01 2017 -0700 ---------------------------------------------------------------------- .../apache/tez/dag/api/TezConfiguration.java | 10 +++++ .../java/org/apache/tez/dag/app/dag/Vertex.java | 7 +++ .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 7 +-- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 5 +-- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 33 +++++++++++++- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 47 +++++++++++++++++++- .../tez/dag/app/dag/impl/TestTaskImpl.java | 9 +++- 7 files changed, 107 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/63177255/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index c0179f8..cc57b4e 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -571,6 +571,16 @@ public class TezConfiguration extends Configuration { public static final int TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT = 4; /** + * Boolean value. Specifies whether a re-scheduled attempt of a task, caused by previous + * failures gets special treatment - higher priority, dropped location hints. + */ + @ConfigurationScope(Scope.VERTEX) + @ConfigurationProperty(type="boolean") + public static final String TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY = + TEZ_AM_PREFIX + "task.reschedule.higher.priority"; + public static final boolean TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY_DEFAULT=true; + + /** * Boolean value. Enabled blacklisting of nodes of nodes that are considered faulty. These nodes * will not be used to execute tasks. */ http://git-wip-us.apache.org/repos/asf/tez/blob/63177255/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index 51847d4..0a6e9c5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -200,4 +200,11 @@ public interface Vertex extends Comparable<Vertex> { void reportTaskStartTime(long taskStartTime); public long getFirstTaskStartTime(); public long getLastTaskFinishTime(); + + VertexConfig getVertexConfig(); + + interface VertexConfig { + int getMaxFailedTaskAttempts(); + boolean getTaskRescheduleHigherPriority(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/63177255/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 07aed5e..a4c4652 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -91,7 +91,6 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate; import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed; import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; @@ -165,6 +164,7 @@ public class TaskAttemptImpl implements TaskAttempt, static final TezCounters EMPTY_COUNTERS = new TezCounters(); + // Should not be used to access configuration. User vertex.VertexConfig instead protected final Configuration conf; @SuppressWarnings("rawtypes") protected EventHandler eventHandler; @@ -542,6 +542,7 @@ public class TaskAttemptImpl implements TaskAttempt, Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec, TezTaskAttemptID schedulingCausalTA) { + // TODO: Move these configs over to Vertex.VertexConfig MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT); @@ -1307,7 +1308,7 @@ public class TaskAttemptImpl implements TaskAttempt, ta.taskRacks = racks; // Ask for hosts / racks only if not a re-scheduled task. - if (ta.isRescheduled) { + if (ta.isRescheduled && ta.getVertex().getVertexConfig().getTaskRescheduleHigherPriority()) { locationHint = null; } @@ -1315,7 +1316,7 @@ public class TaskAttemptImpl implements TaskAttempt, // Send out a launch request to the scheduler. int priority; - if (ta.isRescheduled) { + if (ta.isRescheduled && ta.getVertex().getVertexConfig().getTaskRescheduleHigherPriority()) { // higher priority for rescheduled attempts priority = scheduleEvent.getPriorityHighLimit(); } else { http://git-wip-us.apache.org/repos/asf/tez/blob/63177255/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 04074af..f25e583 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TaskLocationHint; -import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.api.oldrecords.TaskReport; @@ -358,9 +357,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { readLock = readWriteLock.readLock(); writeLock = readWriteLock.writeLock(); this.attempts = Collections.emptyMap(); - // TODO Avoid reading this from configuration for each task. - maxFailedAttempts = this.conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, - TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT); + maxFailedAttempts = vertex.getVertexConfig().getMaxFailedTaskAttempts(); taskId = TezTaskID.getInstance(vertexId, taskIndex); this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface; this.taskHeartbeatHandler = thh; http://git-wip-us.apache.org/repos/asf/tez/blob/63177255/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index b6d66df..30d65c4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -258,6 +258,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl private final float maxFailuresPercent; private boolean logSuccessDiagnostics = false; + private final VertexConfigImpl vertexContextConfig; //fields initialized in init @VisibleForTesting @@ -878,7 +879,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl vertexOnlyConf.set(keyValuePair.getKey(), keyValuePair.getValue()); } } - + this.vertexContextConfig = new VertexConfigImpl(vertexConf); this.clock = clock; this.appContext = appContext; @@ -1304,6 +1305,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } } + @Override + public VertexConfig getVertexConfig() { + return vertexContextConfig; + } + boolean inTerminalState() { VertexState state = getInternalState(); if (state == VertexState.ERROR || state == VertexState.FAILED @@ -4635,4 +4641,29 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl writeLock.unlock(); } } + + @VisibleForTesting + static class VertexConfigImpl implements VertexConfig { + + private final int maxFailedTaskAttempts; + private final boolean taskRescheduleHigherPriority; + + public VertexConfigImpl(Configuration conf) { + this.maxFailedTaskAttempts = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, + TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT); + this.taskRescheduleHigherPriority = + conf.getBoolean(TezConfiguration.TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY, + TezConfiguration.TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY_DEFAULT); + } + + @Override + public int getMaxFailedTaskAttempts() { + return maxFailedTaskAttempts; + } + + @Override + public boolean getTaskRescheduleHigherPriority() { + return taskRescheduleHigherPriority; + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/63177255/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index acf8f23..d5464c8 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -139,6 +139,7 @@ public class TestTaskAttempt { } AppContext appCtx; + TezConfiguration vertexConf = new TezConfiguration(); TaskLocationHint locationHint; Vertex mockVertex; ServicePluginInfo servicePluginInfo = new ServicePluginInfo() @@ -157,6 +158,7 @@ public class TestTaskAttempt { mockVertex = mock(Vertex.class); when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo); + when(mockVertex.getVertexConfig()).thenReturn(new VertexImpl.VertexConfigImpl(vertexConf)); HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class); doReturn(mockHistHandler).when(appCtx).getHistoryHandler(); @@ -202,7 +204,50 @@ public class TestTaskAttempt { assertEquals(host, true, taImpl.taskHosts.contains(host)); } } - + + @Test(timeout = 5000) + public void testRetriesAtSamePriorityConfig() { + + // Override the test defaults to setup the config change + TezConfiguration vertexConf = new TezConfiguration(); + vertexConf.setBoolean(TezConfiguration.TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY, false); + when(mockVertex.getVertexConfig()).thenReturn(new VertexImpl.VertexConfigImpl(vertexConf)); + + TaskAttemptImpl.ScheduleTaskattemptTransition sta = + new TaskAttemptImpl.ScheduleTaskattemptTransition(); + + EventHandler eventHandler = mock(EventHandler.class); + TezTaskID taskID = TezTaskID.getInstance( + TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1); + TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), + mock(TaskHeartbeatHandler.class), appCtx, + false, Resource.newInstance(1024, 1), createFakeContainerContext(), false); + + TaskAttemptImpl taImplReScheduled = new MockTaskAttemptImpl(taskID, 1, eventHandler, + mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), + mock(TaskHeartbeatHandler.class), appCtx, + true, Resource.newInstance(1024, 1), createFakeContainerContext(), false); + + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + + TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class); + when(sEvent.getPriorityLowLimit()).thenReturn(3); + when(sEvent.getPriorityHighLimit()).thenReturn(1); + + // Verify priority for a non-retried attempt + sta.transition(taImpl, sEvent); + verify(eventHandler, times(1)).handle(arg.capture()); + AMSchedulerEventTALaunchRequest launchEvent = (AMSchedulerEventTALaunchRequest) arg.getValue(); + Assert.assertEquals(2, launchEvent.getPriority()); + + // Verify priority for a retried attempt is the same + sta.transition(taImplReScheduled, sEvent); + verify(eventHandler, times(2)).handle(arg.capture()); + launchEvent = (AMSchedulerEventTALaunchRequest) arg.getValue(); + Assert.assertEquals(2, launchEvent.getPriority()); + } + @Test(timeout = 5000) public void testPriority() { TaskAttemptImpl.ScheduleTaskattemptTransition sta = http://git-wip-us.apache.org/repos/asf/tez/blob/63177255/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index 3375047..da25927 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -170,6 +170,7 @@ public class TestTaskImpl { containerContext = new ContainerContext(localResources, credentials, environment, javaOpts); Vertex vertex = mock(Vertex.class); + doReturn(new VertexImpl.VertexConfigImpl(conf)).when(vertex).getVertexConfig(); eventHandler = new TestEventHandler(); mockTask = new MockTaskImpl(vertexId, partition, @@ -784,10 +785,12 @@ public class TestTaskImpl { @Test(timeout = 20000) public void testFailedThenSpeculativeFailed() { conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1); + Vertex vertex = mock(Vertex.class); + doReturn(new VertexImpl.VertexConfigImpl(conf)).when(vertex).getVertexConfig(); mockTask = new MockTaskImpl(vertexId, partition, eventHandler, conf, taskCommunicatorManagerInterface, clock, taskHeartbeatHandler, appContext, leafVertex, - taskResource, containerContext, mock(Vertex.class)); + taskResource, containerContext, vertex); TezTaskID taskId = getNewTaskID(); scheduleTaskAttempt(taskId); MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt(); @@ -817,10 +820,12 @@ public class TestTaskImpl { @Test(timeout = 20000) public void testFailedThenSpeculativeSucceeded() { conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1); + Vertex vertex = mock(Vertex.class); + doReturn(new VertexImpl.VertexConfigImpl(conf)).when(vertex).getVertexConfig(); mockTask = new MockTaskImpl(vertexId, partition, eventHandler, conf, taskCommunicatorManagerInterface, clock, taskHeartbeatHandler, appContext, leafVertex, - taskResource, containerContext, mock(Vertex.class)); + taskResource, containerContext, vertex); TezTaskID taskId = getNewTaskID(); scheduleTaskAttempt(taskId); MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
