TEZ-3183. Change the taskFailed method on plugin contexts to specify the type of failure. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b0e8fd1b Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b0e8fd1b Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b0e8fd1b Branch: refs/heads/master Commit: b0e8fd1bc31dd6573b3f9b2bbf7535d33b9f97eb Parents: 43e9c02 Author: Siddharth Seth <[email protected]> Authored: Thu Apr 7 10:45:04 2016 -0700 Committer: Siddharth Seth <[email protected]> Committed: Thu Apr 7 10:45:04 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../dag/app/TaskCommunicatorContextImpl.java | 8 +-- .../tez/dag/app/TaskCommunicatorManager.java | 5 +- .../api/TaskCommunicatorContext.java | 5 +- .../dag/app/TestTaskCommunicatorManager2.java | 52 +++++++++++++++++++- .../TezTestServiceTaskCommunicatorImpl.java | 9 ++-- 6 files changed, 68 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/b0e8fd1b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1b419ac..5dbe3d1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.8.3: Unreleased INCOMPATIBLE CHANGES TEZ-3029. Add an onError method to service plugin contexts. TEZ-3120. Remove TaskCommContext.getCurrentDagName, Identifier. + TEZ-3183. Change the taskFailed method on plugin contexts to specify the type of failure. ALL CHANGES: TEZ-3161. Allow task to report different kinds of errors - fatal / kill. http://git-wip-us.apache.org/repos/asf/tez/blob/b0e8fd1b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java index c31567b..4c43fdd 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.rm.container.AMContainer; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.serviceplugins.api.DagInfo; import org.apache.tez.serviceplugins.api.ServicePluginError; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; @@ -132,10 +133,11 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver } @Override - public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, + public void taskFailed(TezTaskAttemptID taskAttemptId, TaskFailureType taskFailureType, + TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics) { - taskCommunicatorManager.taskFailed(taskAttemptId, taskAttemptEndReason, diagnostics); - + taskCommunicatorManager + .taskFailed(taskAttemptId, taskFailureType, taskAttemptEndReason, diagnostics); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/b0e8fd1b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java index 36b74de..cfb177b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java @@ -391,7 +391,8 @@ public class TaskCommunicatorManager extends AbstractService implements taskAttemptEndReason))); } - public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, + public void taskFailed(TezTaskAttemptID taskAttemptId, TaskFailureType taskFailureType, + TaskAttemptEndReason taskAttemptEndReason, String diagnostics) { // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler, // and messages from the scheduler will release the container. @@ -400,7 +401,7 @@ public class TaskCommunicatorManager extends AbstractService implements // Fix along the same lines as TEZ-2124 by introducing an explict context. //TODO-3183. Allow the FailureType to be specified sendEvent(new TaskAttemptEventAttemptFailed(taskAttemptId, - TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason( + TaskAttemptEventType.TA_FAILED, taskFailureType, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason( taskAttemptEndReason))); } http://git-wip-us.apache.org/repos/asf/tez/blob/b0e8fd1b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java index 2de2f45..a17a70d 100644 --- a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java +++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.TaskFailureType; // Do not make calls into this from within a held lock. @@ -146,11 +147,13 @@ public interface TaskCommunicatorContext extends ServicePluginContextBase { * attempts left. * * @param taskAttemptId the relevant task attempt id + * @param taskFailureType the type of the error * @param taskAttemptEndReason the reason for the task failure * @param diagnostics any diagnostics messages which are relevant to the task attempt * failure */ - void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, + void taskFailed(TezTaskAttemptID taskAttemptId, TaskFailureType taskFailureType, + TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics); /** http://git-wip-us.apache.org/repos/asf/tez/blob/b0e8fd1b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java index 4950e09..bb7e94b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java @@ -96,7 +96,7 @@ public class TestTaskCommunicatorManager2 { wrapper.registerTaskAttempt(containerId2, amContainerTask2); wrapper.getTaskCommunicatorManager().taskFailed(amContainerTask1.getTask().getTaskAttemptID(), - TaskAttemptEndReason.COMMUNICATION_ERROR, "Diagnostics1"); + TaskFailureType.NON_FATAL, TaskAttemptEndReason.COMMUNICATION_ERROR, "Diagnostics1"); wrapper.getTaskCommunicatorManager().taskKilled(amContainerTask2.getTask().getTaskAttemptID(), TaskAttemptEndReason.EXECUTOR_BUSY, "Diagnostics2"); @@ -118,9 +118,10 @@ public class TestTaskCommunicatorManager2 { // TODO TEZ-2003. Verify unregistration from the registered list } + // Tests fatal and non fatal @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testTaskAttemptFailureViaHeartbeatNonFatal() throws IOException, TezException { + public void testTaskAttemptFailureViaHeartbeat() throws IOException, TezException { TaskCommunicatorManagerWrapperForTest wrapper = new TaskCommunicatorManagerWrapperForTest(); @@ -183,6 +184,53 @@ public class TestTaskCommunicatorManager2 { assertTrue(failedEvent.getDiagnosticInfo().contains("-fatal-")); } + // Tests fatal and non fatal + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testTaskAttemptFailureViaContext() throws IOException, TezException { + TaskCommunicatorManagerWrapperForTest wrapper = new TaskCommunicatorManagerWrapperForTest(); + + TaskSpec taskSpec1 = wrapper.createTaskSpec(); + AMContainerTask amContainerTask1 = new AMContainerTask(taskSpec1, null, null, false, 10); + + TaskSpec taskSpec2 = wrapper.createTaskSpec(); + AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec2, null, null, false, 10); + + ContainerId containerId1 = wrapper.createContainerId(1); + wrapper.registerRunningContainer(containerId1); + wrapper.registerTaskAttempt(containerId1, amContainerTask1); + + ContainerId containerId2 = wrapper.createContainerId(2); + wrapper.registerRunningContainer(containerId2); + wrapper.registerTaskAttempt(containerId2, amContainerTask2); + + + // non-fatal + wrapper.getTaskCommunicatorManager() + .taskFailed(taskSpec1.getTaskAttemptID(), TaskFailureType.NON_FATAL, + TaskAttemptEndReason.CONTAINER_EXITED, "--non-fatal--"); + ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class); + verify(wrapper.getEventHandler(), times(1)).handle(argumentCaptor.capture()); + assertTrue(argumentCaptor.getAllValues().get(0) instanceof TaskAttemptEventAttemptFailed); + TaskAttemptEventAttemptFailed failedEvent = + (TaskAttemptEventAttemptFailed) argumentCaptor.getAllValues().get(0); + assertEquals(TaskFailureType.NON_FATAL, failedEvent.getTaskFailureType()); + assertTrue(failedEvent.getDiagnosticInfo().contains("--non-fatal--")); + + reset(wrapper.getEventHandler()); + + // fatal + wrapper.getTaskCommunicatorManager() + .taskFailed(taskSpec2.getTaskAttemptID(), TaskFailureType.FATAL, TaskAttemptEndReason.OTHER, + "--fatal--"); + argumentCaptor = ArgumentCaptor.forClass(Event.class); + verify(wrapper.getEventHandler(), times(1)).handle(argumentCaptor.capture()); + assertTrue(argumentCaptor.getAllValues().get(0) instanceof TaskAttemptEventAttemptFailed); + failedEvent = (TaskAttemptEventAttemptFailed) argumentCaptor.getAllValues().get(0); + assertEquals(TaskFailureType.FATAL, failedEvent.getTaskFailureType()); + assertTrue(failedEvent.getDiagnosticInfo().contains("--fatal--")); + } + @SuppressWarnings("unchecked") private static class TaskCommunicatorManagerWrapperForTest { ApplicationId appId = ApplicationId.newInstance(1000, 1); http://git-wip-us.apache.org/repos/asf/tez/blob/b0e8fd1b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java index f199dcf..ac36f7c 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java @@ -29,6 +29,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; @@ -157,8 +158,8 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl TaskAttemptEndReason.EXECUTOR_BUSY, "Service Busy"); } else { getContext() - .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, - t.toString()); + .taskFailed(taskSpec.getTaskAttemptID(), TaskFailureType.NON_FATAL, + TaskAttemptEndReason.OTHER, t.toString()); } } else { if (t instanceof IOException) { @@ -166,8 +167,8 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error"); } else { getContext() - .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, - t.getMessage()); + .taskFailed(taskSpec.getTaskAttemptID(), TaskFailureType.NON_FATAL, + TaskAttemptEndReason.OTHER, t.getMessage()); } } }
