TEZ-3183. Change the taskFailed method on plugin contexts to specify the
type of failure. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b0e8fd1b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b0e8fd1b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b0e8fd1b

Branch: refs/heads/master
Commit: b0e8fd1bc31dd6573b3f9b2bbf7535d33b9f97eb
Parents: 43e9c02
Author: Siddharth Seth <[email protected]>
Authored: Thu Apr 7 10:45:04 2016 -0700
Committer: Siddharth Seth <[email protected]>
Committed: Thu Apr 7 10:45:04 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../dag/app/TaskCommunicatorContextImpl.java    |  8 +--
 .../tez/dag/app/TaskCommunicatorManager.java    |  5 +-
 .../api/TaskCommunicatorContext.java            |  5 +-
 .../dag/app/TestTaskCommunicatorManager2.java   | 52 +++++++++++++++++++-
 .../TezTestServiceTaskCommunicatorImpl.java     |  9 ++--
 6 files changed, 68 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b0e8fd1b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1b419ac..5dbe3d1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.3: Unreleased
 INCOMPATIBLE CHANGES
   TEZ-3029. Add an onError method to service plugin contexts.
   TEZ-3120. Remove TaskCommContext.getCurrentDagName, Identifier.
+  TEZ-3183. Change the taskFailed method on plugin contexts to specify the 
type of failure.
 
 ALL CHANGES:
   TEZ-3161. Allow task to report different kinds of errors - fatal / kill.

http://git-wip-us.apache.org/repos/asf/tez/blob/b0e8fd1b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index c31567b..4c43fdd 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -29,6 +29,7 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.rm.container.AMContainer;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.serviceplugins.api.DagInfo;
 import org.apache.tez.serviceplugins.api.ServicePluginError;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
@@ -132,10 +133,11 @@ public class TaskCommunicatorContextImpl implements 
TaskCommunicatorContext, Ver
   }
 
   @Override
-  public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason 
taskAttemptEndReason,
+  public void taskFailed(TezTaskAttemptID taskAttemptId, TaskFailureType 
taskFailureType,
+                         TaskAttemptEndReason taskAttemptEndReason,
                          @Nullable String diagnostics) {
-    taskCommunicatorManager.taskFailed(taskAttemptId, taskAttemptEndReason, 
diagnostics);
-
+    taskCommunicatorManager
+        .taskFailed(taskAttemptId, taskFailureType, taskAttemptEndReason, 
diagnostics);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/b0e8fd1b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index 36b74de..cfb177b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -391,7 +391,8 @@ public class TaskCommunicatorManager extends 
AbstractService implements
         taskAttemptEndReason)));
   }
 
-  public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason 
taskAttemptEndReason,
+  public void taskFailed(TezTaskAttemptID taskAttemptId, TaskFailureType 
taskFailureType,
+                         TaskAttemptEndReason taskAttemptEndReason,
                          String diagnostics) {
     // Regular flow via TaskAttempt will take care of un-registering from the 
heartbeat handler,
     // and messages from the scheduler will release the container.
@@ -400,7 +401,7 @@ public class TaskCommunicatorManager extends 
AbstractService implements
     // Fix along the same lines as TEZ-2124 by introducing an explict context.
     //TODO-3183. Allow the FailureType to be specified
     sendEvent(new TaskAttemptEventAttemptFailed(taskAttemptId,
-        TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, 
diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
+        TaskAttemptEventType.TA_FAILED, taskFailureType, diagnostics, 
TezUtilsInternal.fromTaskAttemptEndReason(
         taskAttemptEndReason)));
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b0e8fd1b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
 
b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
index 2de2f45..a17a70d 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.TaskFailureType;
 
 
 // Do not make calls into this from within a held lock.
@@ -146,11 +147,13 @@ public interface TaskCommunicatorContext extends 
ServicePluginContextBase {
    * attempts left.
    *
    * @param taskAttemptId        the relevant task attempt id
+   * @param taskFailureType      the type of the error
    * @param taskAttemptEndReason the reason for the task failure
    * @param diagnostics          any diagnostics messages which are relevant 
to the task attempt
    *                             failure
    */
-  void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason 
taskAttemptEndReason,
+  void taskFailed(TezTaskAttemptID taskAttemptId, TaskFailureType 
taskFailureType,
+                  TaskAttemptEndReason taskAttemptEndReason,
                   @Nullable String diagnostics);
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/b0e8fd1b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
 
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
index 4950e09..bb7e94b 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
@@ -96,7 +96,7 @@ public class TestTaskCommunicatorManager2 {
     wrapper.registerTaskAttempt(containerId2, amContainerTask2);
 
     
wrapper.getTaskCommunicatorManager().taskFailed(amContainerTask1.getTask().getTaskAttemptID(),
-        TaskAttemptEndReason.COMMUNICATION_ERROR, "Diagnostics1");
+        TaskFailureType.NON_FATAL, TaskAttemptEndReason.COMMUNICATION_ERROR, 
"Diagnostics1");
     
wrapper.getTaskCommunicatorManager().taskKilled(amContainerTask2.getTask().getTaskAttemptID(),
         TaskAttemptEndReason.EXECUTOR_BUSY, "Diagnostics2");
 
@@ -118,9 +118,10 @@ public class TestTaskCommunicatorManager2 {
 //   TODO TEZ-2003. Verify unregistration from the registered list
   }
 
+  // Tests fatal and non fatal
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testTaskAttemptFailureViaHeartbeatNonFatal() throws IOException, 
TezException {
+  public void testTaskAttemptFailureViaHeartbeat() throws IOException, 
TezException {
 
     TaskCommunicatorManagerWrapperForTest wrapper = new 
TaskCommunicatorManagerWrapperForTest();
 
@@ -183,6 +184,53 @@ public class TestTaskCommunicatorManager2 {
     assertTrue(failedEvent.getDiagnosticInfo().contains("-fatal-"));
   }
 
+  // Tests fatal and non fatal
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testTaskAttemptFailureViaContext() throws IOException, 
TezException {
+    TaskCommunicatorManagerWrapperForTest wrapper = new 
TaskCommunicatorManagerWrapperForTest();
+
+    TaskSpec taskSpec1 = wrapper.createTaskSpec();
+    AMContainerTask amContainerTask1 = new AMContainerTask(taskSpec1, null, 
null, false, 10);
+
+    TaskSpec taskSpec2 = wrapper.createTaskSpec();
+    AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec2, null, 
null, false, 10);
+
+    ContainerId containerId1 = wrapper.createContainerId(1);
+    wrapper.registerRunningContainer(containerId1);
+    wrapper.registerTaskAttempt(containerId1, amContainerTask1);
+
+    ContainerId containerId2 = wrapper.createContainerId(2);
+    wrapper.registerRunningContainer(containerId2);
+    wrapper.registerTaskAttempt(containerId2, amContainerTask2);
+
+
+    // non-fatal
+    wrapper.getTaskCommunicatorManager()
+        .taskFailed(taskSpec1.getTaskAttemptID(), TaskFailureType.NON_FATAL,
+            TaskAttemptEndReason.CONTAINER_EXITED, "--non-fatal--");
+    ArgumentCaptor<Event> argumentCaptor = 
ArgumentCaptor.forClass(Event.class);
+    verify(wrapper.getEventHandler(), 
times(1)).handle(argumentCaptor.capture());
+    assertTrue(argumentCaptor.getAllValues().get(0) instanceof 
TaskAttemptEventAttemptFailed);
+    TaskAttemptEventAttemptFailed failedEvent =
+        (TaskAttemptEventAttemptFailed) argumentCaptor.getAllValues().get(0);
+    assertEquals(TaskFailureType.NON_FATAL, failedEvent.getTaskFailureType());
+    assertTrue(failedEvent.getDiagnosticInfo().contains("--non-fatal--"));
+
+    reset(wrapper.getEventHandler());
+
+    // fatal
+    wrapper.getTaskCommunicatorManager()
+        .taskFailed(taskSpec2.getTaskAttemptID(), TaskFailureType.FATAL, 
TaskAttemptEndReason.OTHER,
+            "--fatal--");
+    argumentCaptor = ArgumentCaptor.forClass(Event.class);
+    verify(wrapper.getEventHandler(), 
times(1)).handle(argumentCaptor.capture());
+    assertTrue(argumentCaptor.getAllValues().get(0) instanceof 
TaskAttemptEventAttemptFailed);
+    failedEvent = (TaskAttemptEventAttemptFailed) 
argumentCaptor.getAllValues().get(0);
+    assertEquals(TaskFailureType.FATAL, failedEvent.getTaskFailureType());
+    assertTrue(failedEvent.getDiagnosticInfo().contains("--fatal--"));
+  }
+
   @SuppressWarnings("unchecked")
   private static class TaskCommunicatorManagerWrapperForTest {
     ApplicationId appId = ApplicationId.newInstance(1000, 1);

http://git-wip-us.apache.org/repos/asf/tez/blob/b0e8fd1b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
 
b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index f199dcf..ac36f7c 100644
--- 
a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ 
b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
@@ -157,8 +158,8 @@ public class TezTestServiceTaskCommunicatorImpl extends 
TezTaskCommunicatorImpl
                     TaskAttemptEndReason.EXECUTOR_BUSY, "Service Busy");
               } else {
                 getContext()
-                    .taskFailed(taskSpec.getTaskAttemptID(), 
TaskAttemptEndReason.OTHER,
-                        t.toString());
+                    .taskFailed(taskSpec.getTaskAttemptID(), 
TaskFailureType.NON_FATAL,
+                        TaskAttemptEndReason.OTHER, t.toString());
               }
             } else {
               if (t instanceof IOException) {
@@ -166,8 +167,8 @@ public class TezTestServiceTaskCommunicatorImpl extends 
TezTaskCommunicatorImpl
                     TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication 
Error");
               } else {
                 getContext()
-                    .taskFailed(taskSpec.getTaskAttemptID(), 
TaskAttemptEndReason.OTHER,
-                        t.getMessage());
+                    .taskFailed(taskSpec.getTaskAttemptID(), 
TaskFailureType.NON_FATAL,
+                        TaskAttemptEndReason.OTHER, t.getMessage());
               }
             }
           }

Reply via email to