Repository: tez
Updated Branches:
  refs/heads/master 5ce07f89f -> 27a13fc97


http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index 219cc2f..1d619a3 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -39,6 +39,7 @@ import org.apache.tez.hadoop.shim.HadoopShim;
 import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.ObjectRegistry;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
@@ -84,9 +85,10 @@ public class TezTaskRunner2 {
   // TaskRunnerCallable, a failure to heartbeat, or a signalFatalError on the 
context.
   private volatile Throwable firstException;
   private volatile EventMetaData exceptionSourceInfo;
+  private volatile TaskFailureType firstTaskFailureType;
   private final AtomicBoolean errorReporterToAm = new AtomicBoolean(false);
 
-  private boolean oobSignalErrorInProgress = false;
+  private volatile boolean oobSignalErrorInProgress = false;
   private final Lock oobSignalLock = new ReentrantLock();
   private final Condition oobSignalCondition = oobSignalLock.newCondition();
 
@@ -147,7 +149,7 @@ public class TezTaskRunner2 {
       }
 
       if (future == null) {
-        return logAndReturnEndResult(firstEndReason, firstException, 
stopContainerRequested.get());
+        return logAndReturnEndResult(firstEndReason, firstTaskFailureType, 
firstException, stopContainerRequested.get());
       }
 
       TaskRunner2CallableResult executionResult = null;
@@ -161,7 +163,7 @@ public class TezTaskRunner2 {
         synchronized (this) {
           if (isRunningState()) {
             trySettingEndReason(EndReason.TASK_ERROR);
-            registerFirstException(e, null);
+            registerFirstException(TaskFailureType.NON_FATAL, e, null);
             LOG.warn("Exception from RunnerCallable", e);
           }
         }
@@ -172,30 +174,32 @@ public class TezTaskRunner2 {
         case SUCCESS:
           try {
             taskReporter.taskSucceeded(task.getTaskAttemptID());
-            return logAndReturnEndResult(EndReason.SUCCESS, null, 
stopContainerRequested.get());
+            return logAndReturnEndResult(EndReason.SUCCESS, null, null, 
stopContainerRequested.get());
           } catch (IOException e) {
             // Comm failure. Task can't do much.
-            handleFinalStatusUpdateFailure(e, true);
-            return logAndReturnEndResult(EndReason.COMMUNICATION_FAILURE, e, 
stopContainerRequested.get());
+            handleFinalStatusUpdateFailure(e, "success");
+            return logAndReturnEndResult(EndReason.COMMUNICATION_FAILURE, 
firstTaskFailureType, e, stopContainerRequested.get());
           } catch (TezException e) {
             // Failure from AM. Task can't do much.
-            handleFinalStatusUpdateFailure(e, true);
-            return logAndReturnEndResult(EndReason.COMMUNICATION_FAILURE, e, 
stopContainerRequested.get());
+            handleFinalStatusUpdateFailure(e, "success");
+            return logAndReturnEndResult(EndReason.COMMUNICATION_FAILURE, 
firstTaskFailureType, e, stopContainerRequested.get());
           }
         case CONTAINER_STOP_REQUESTED:
           // Don't need to send any more communication updates to the AM.
-          return logAndReturnEndResult(firstEndReason, null, 
stopContainerRequested.get());
+          return logAndReturnEndResult(firstEndReason, firstTaskFailureType, 
null, stopContainerRequested.get());
         case KILL_REQUESTED:
-          // Kill is currently not reported to the AM via the TaskRunner. Fix 
this when the umbilical
-          // supports an indication of kill, if required.
-          return logAndReturnEndResult(firstEndReason, null, 
stopContainerRequested.get());
+          // This was an external kill called directly on the task runner
+          return logAndReturnEndResult(firstEndReason, firstTaskFailureType, 
null, stopContainerRequested.get());
+        case TASK_KILL_REQUEST:
+          // Task reported a self kill
+          return logAndReturnEndResult(firstEndReason, firstTaskFailureType, 
firstException, stopContainerRequested.get());
         case COMMUNICATION_FAILURE:
           // Already seen a communication failure. There's no point trying to 
report another one.
-          return logAndReturnEndResult(firstEndReason, firstException, 
stopContainerRequested.get());
+          return logAndReturnEndResult(firstEndReason, firstTaskFailureType, 
firstException, stopContainerRequested.get());
         case TASK_ERROR:
           // Don't report an error again if it was reported via 
signalFatalError
           if (errorReporterToAm.get()) {
-            return logAndReturnEndResult(firstEndReason, firstException, 
stopContainerRequested.get());
+            return logAndReturnEndResult(firstEndReason, firstTaskFailureType, 
firstException, stopContainerRequested.get());
           } else {
             String message;
             if (firstException instanceof FSError) {
@@ -203,24 +207,24 @@ public class TezTaskRunner2 {
             } else if (firstException instanceof Error) {
               message = "Encountered an Error while executing task: " + 
task.getTaskAttemptID();
             } else {
-              message = "Failure while running task: " + 
task.getTaskAttemptID();
+              message = "Error while running task ( failure ) : " + 
task.getTaskAttemptID();
             }
             try {
-              taskReporter.taskFailed(task.getTaskAttemptID(), firstException, 
message, exceptionSourceInfo);
-              return logAndReturnEndResult(firstEndReason, firstException, 
stopContainerRequested.get());
+              taskReporter.taskFailed(task.getTaskAttemptID(), 
firstTaskFailureType, firstException, message, exceptionSourceInfo);
+              return logAndReturnEndResult(firstEndReason, 
firstTaskFailureType, firstException, stopContainerRequested.get());
             } catch (IOException e) {
               // Comm failure. Task can't do much.
-              handleFinalStatusUpdateFailure(e, true);
-              return logAndReturnEndResult(firstEndReason, firstException, 
stopContainerRequested.get());
+              handleFinalStatusUpdateFailure(e, "failure");
+              return logAndReturnEndResult(firstEndReason, 
firstTaskFailureType, firstException, stopContainerRequested.get());
             } catch (TezException e) {
               // Failure from AM. Task can't do much.
-              handleFinalStatusUpdateFailure(e, true);
-              return logAndReturnEndResult(firstEndReason, firstException, 
stopContainerRequested.get());
+              handleFinalStatusUpdateFailure(e, "failure");
+              return logAndReturnEndResult(firstEndReason, 
firstTaskFailureType, firstException, stopContainerRequested.get());
             }
           }
         default:
           LOG.error("Unexpected EndReason. File a bug");
-          return logAndReturnEndResult(EndReason.TASK_ERROR, new 
RuntimeException("Unexpected EndReason"), stopContainerRequested.get());
+          return logAndReturnEndResult(EndReason.TASK_ERROR, 
firstTaskFailureType, new RuntimeException("Unexpected EndReason"), 
stopContainerRequested.get());
 
       }
     } finally {
@@ -257,7 +261,7 @@ public class TezTaskRunner2 {
         if (isRunningState()) {
           if (executionResult.error != null) {
             trySettingEndReason(EndReason.TASK_ERROR);
-            registerFirstException(executionResult.error, null);
+            registerFirstException(TaskFailureType.NON_FATAL, 
executionResult.error, null);
           } else {
             trySettingEndReason(EndReason.SUCCESS);
             taskComplete.set(true);
@@ -295,8 +299,19 @@ public class TezTaskRunner2 {
   }
 
   private void killTaskInternal() {
+    abortTaskInternal();
+    interruptTaskInternal();
+  }
+
+  private void abortTaskInternal() {
     if (taskRunnerCallable != null) {
       taskKillStartTime = System.currentTimeMillis();
+      taskRunnerCallable.abortTask();
+    }
+  }
+
+  private void interruptTaskInternal() {
+    if (taskRunnerCallable != null) {
       taskRunnerCallable.interruptTask();
     }
   }
@@ -320,57 +335,20 @@ public class TezTaskRunner2 {
     }
 
     @Override
-    public void signalFatalError(TezTaskAttemptID taskAttemptID, Throwable t, 
String message,
-                                 EventMetaData sourceInfo) {
+    public void signalFailure(TezTaskAttemptID taskAttemptID, TaskFailureType 
taskFailureType, Throwable t, String message,
+                              EventMetaData sourceInfo) {
       // Fatal error reported by the task.
-      boolean isFirstError = false;
-      synchronized (TezTaskRunner2.this) {
-        if (isRunningState()) {
-          if (trySettingEndReason(EndReason.TASK_ERROR)) {
-            if (t == null) {
-              t = new RuntimeException(
-                  message == null ? "FatalError: No user message or exception 
specified" : message);
-            }
-            registerFirstException(t, sourceInfo);
-            LOG.info("Received notification of a fatal error which will cause 
the task to die", t);
-            isFirstError = true;
-            errorReporterToAm.set(true);
-            oobSignalErrorInProgress = true;
-          } else {
-            logErrorIgnored("signalFatalError", message);
-          }
-        } else {
-          logErrorIgnored("signalFatalError", message);
-        }
-      }
+      signalTerminationInternal(taskAttemptID, EndReason.TASK_ERROR, 
taskFailureType, t, message, sourceInfo, false);
+    }
+
+    @Override
+    public void signalKillSelf(TezTaskAttemptID taskAttemptID, Throwable t, 
String message,
+                               EventMetaData sourceInfo) {
+      signalTerminationInternal(taskAttemptID, EndReason.TASK_KILL_REQUEST, 
null, t, message, sourceInfo, true);
 
-      // Informing the TaskReporter here because the running task may not be 
interruptable.
-      // Has to be outside the lock.
-      if (isFirstError) {
-        logAborting("signalFatalError");
-        killTaskInternal();
-        try {
-          taskReporter.taskFailed(taskAttemptID, t, 
getTaskDiagnosticsString(t, message), sourceInfo);
-        } catch (IOException e) {
-          // Comm failure. Task can't do much. The main exception is already 
registered.
-          handleFinalStatusUpdateFailure(e, true);
-        } catch (TezException e) {
-          // Failure from AM. Task can't do much. The main exception is 
already registered.
-          handleFinalStatusUpdateFailure(e, true);
-        } finally {
-          oobSignalLock.lock();
-          try {
-            // This message is being sent outside of the main thread, which 
may end up completing before
-            // this thread runs. Make sure the main run thread does not end 
till this completes.
-            oobSignalErrorInProgress = false;
-            oobSignalCondition.signal();
-          } finally {
-            oobSignalLock.unlock();
-          }
-        }
-      }
     }
 
+
     @Override
     public boolean canCommit(TezTaskAttemptID taskAttemptID) throws 
IOException {
       // Task checking whether it can commit.
@@ -397,7 +375,7 @@ public class TezTaskRunner2 {
         if (isRunningState()) {
           LOG.info("TaskReporter reporter error which will cause the task to 
fail", t);
           if (trySettingEndReason(EndReason.COMMUNICATION_FAILURE)) {
-            registerFirstException(t, null);
+            registerFirstException(TaskFailureType.NON_FATAL, t, null);
             isFirstError = true;
           } else {
             logErrorIgnored("umbilicalFatalError", null);
@@ -436,6 +414,72 @@ public class TezTaskRunner2 {
     }
   }
 
+
+  private void signalTerminationInternal(TezTaskAttemptID taskAttemptID, 
EndReason endReason,
+                                         TaskFailureType taskFailureType, 
Throwable t, String message,
+                                         EventMetaData sourceInfo, boolean 
isKill) {
+    boolean isFirstError = false;
+    String typeString = isKill ? " kill " : " failure ";
+    synchronized (TezTaskRunner2.this) {
+      if (isRunningState()) {
+        if (trySettingEndReason(endReason)) {
+          if (t == null) {
+            String errMessage = message;
+            if (errMessage == null) {
+              errMessage = typeString + " : No user message or exception 
specified";
+            }
+            t = new RuntimeException(errMessage);
+          }
+          registerFirstException(taskFailureType, t, sourceInfo);
+          LOG.info("Received notification of a " + typeString +
+              " which will cause the task to die", t);
+          isFirstError = true;
+          errorReporterToAm.set(true);
+          oobSignalErrorInProgress = true;
+        } else {
+          logErrorIgnored(typeString, message);
+        }
+      } else {
+        logErrorIgnored(typeString, message);
+      }
+    }
+
+    // Informing the TaskReporter here because the running task may not be 
interruptable.
+    // Has to be outside the lock.
+    if (isFirstError) {
+      logAborting(typeString);
+      abortTaskInternal();
+      try {
+        if (isKill) {
+          taskReporter
+              .taskKilled(taskAttemptID, t, getTaskDiagnosticsString(t, 
message, typeString), sourceInfo);
+        } else {
+          taskReporter.taskFailed(taskAttemptID, taskFailureType, t,
+              getTaskDiagnosticsString(t, message, typeString), sourceInfo);
+        }
+      } catch (IOException e) {
+        // Comm failure. Task can't do much. The main exception is already 
registered.
+        handleFinalStatusUpdateFailure(e, typeString);
+      } catch (TezException e) {
+        // Failure from AM. Task can't do much. The main exception is already 
registered.
+        handleFinalStatusUpdateFailure(e, typeString);
+      } catch (Exception e) {
+        handleFinalStatusUpdateFailure(e, typeString);
+      } finally {
+        interruptTaskInternal();
+        oobSignalLock.lock();
+        try {
+          // This message is being sent outside of the main thread, which may 
end up completing before
+          // this thread runs. Make sure the main run thread does not end till 
this completes.
+          oobSignalErrorInProgress = false;
+          oobSignalCondition.signal();
+        } finally {
+          oobSignalLock.unlock();
+        }
+      }
+    }
+  }
+
   private synchronized boolean trySettingEndReason(EndReason endReason) {
     if (isRunningState()) {
       firstEndReason = endReason;
@@ -445,39 +489,43 @@ public class TezTaskRunner2 {
   }
 
 
-  private void registerFirstException(Throwable t, EventMetaData sourceInfo) {
+  private void registerFirstException(TaskFailureType taskFailureType, 
Throwable t, EventMetaData sourceInfo) {
     Preconditions.checkState(isRunningState());
     errorSeen.set(true);
     firstException = t;
+    this.firstTaskFailureType = taskFailureType;
     this.exceptionSourceInfo = sourceInfo;
   }
 
 
-  private String getTaskDiagnosticsString(Throwable t, String message) {
+  private String getTaskDiagnosticsString(Throwable t, String message, String 
typeString) {
     String diagnostics;
     if (t != null && message != null) {
-      diagnostics = "Failure while running task: " + 
ExceptionUtils.getStackTrace(t) + ", errorMessage="
+      diagnostics = "Error while running task (" + typeString + ") : " + 
ExceptionUtils.getStackTrace(t) + ", errorMessage="
           + message;
     } else if (t == null && message == null) {
       diagnostics = "Unknown error";
     } else {
-      diagnostics = t != null ? "Failure while running task: " + 
ExceptionUtils.getStackTrace(t)
+      diagnostics = t != null ? "Error while running task (" + typeString + ") 
: " + ExceptionUtils.getStackTrace(t)
           : " errorMessage=" + message;
     }
     return diagnostics;
   }
 
-  private TaskRunner2Result logAndReturnEndResult(EndReason endReason, 
Throwable firstError,
+  private TaskRunner2Result logAndReturnEndResult(EndReason endReason,
+                                                  TaskFailureType 
taskFailureType,
+                                                  Throwable firstError,
                                                   boolean 
stopContainerRequested) {
-    TaskRunner2Result result = new TaskRunner2Result(endReason, firstError, 
stopContainerRequested);
+    TaskRunner2Result result =
+        new TaskRunner2Result(endReason, taskFailureType, firstError, 
stopContainerRequested);
     LOG.info("TaskRunnerResult for {} : {}  ", task.getTaskAttemptID(), 
result);
     return result;
   }
 
-  private void handleFinalStatusUpdateFailure(Throwable t, boolean 
successReportAttempted) {
+  private void handleFinalStatusUpdateFailure(Throwable t, String stateString) 
{
     // TODO Ideally differentiate between FAILED/KILLED
     LOG.warn("Failure while reporting state= {} to AM",
-        (successReportAttempted ? "success" : "failure/killed"), t);
+        stateString, t);
   }
 
   private void logErrorIgnored(String ignoredEndReason, String errorMessage) {

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/proto/Events.proto 
b/tez-runtime-internals/src/main/proto/Events.proto
deleted file mode 100644
index 558a2b3..0000000
--- a/tez-runtime-internals/src/main/proto/Events.proto
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tez.runtime.internals.api.events";
-option java_outer_classname = "SystemEventProtos";
-option java_generate_equals_and_hash = true;
-
-message TaskAttemptFailedEventProto {
-  optional string diagnostics = 1;
-}
-
-message TaskAttemptCompletedEventProto {
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/proto/RuntimeEvents.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/proto/RuntimeEvents.proto 
b/tez-runtime-internals/src/main/proto/RuntimeEvents.proto
new file mode 100644
index 0000000..660988c
--- /dev/null
+++ b/tez-runtime-internals/src/main/proto/RuntimeEvents.proto
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.runtime.internals.api.events";
+option java_outer_classname = "SystemEventProtos";
+option java_generate_equals_and_hash = true;
+
+enum TaskFailureTypeProto {
+  FT_NON_FATAL = 0;
+  FT_FATAL = 1;
+}  
+
+message TaskAttemptFailedEventProto {
+  optional string diagnostics = 1;
+  optional TaskFailureTypeProto task_failure_type = 2;
+}
+
+message TaskAttemptKilledEventProto {
+  optional string diagnostics = 1;
+}
+
+message TaskAttemptCompletedEventProto {
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
index 7502c41..626d178 100644
--- 
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
+++ 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
@@ -14,6 +14,7 @@
 
 package org.apache.tez.runtime.task;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
@@ -35,11 +36,13 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptKilledEvent;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
 import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
@@ -53,12 +56,29 @@ public class TaskExecutionTestHelpers {
   // Uses static fields for signaling. Ensure only used by one test at a time.
   public static class TestProcessor extends AbstractLogicalIOProcessor {
 
-    public static final byte[] CONF_EMPTY = new byte[] { 0 };
-    public static final byte[] CONF_THROW_IO_EXCEPTION = new byte[] { 1 };
-    public static final byte[] CONF_THROW_TEZ_EXCEPTION = new byte[] { 2 };
-    public static final byte[] CONF_SIGNAL_FATAL_AND_THROW = new byte[] { 4 };
-    public static final byte[] CONF_SIGNAL_FATAL_AND_LOOP = new byte[] { 8 };
-    public static final byte[] CONF_SIGNAL_FATAL_AND_COMPLETE = new byte[] { 
16 };
+    private static final int EMPTY = 0;
+    private static final int THROW_IO_EXCEPTION = 1;
+    private static final int THROW_TEZ_EXCEPTION = 2;
+    private static final int SIGNAL_DEPRECATEDFATAL_AND_THROW = 3;
+    private static final int SIGNAL_DEPRECATEDFATAL_AND_LOOP = 4;
+    private static final int SIGNAL_DEPRECATEDFATAL_AND_COMPLETE = 5;
+    private static final int SIGNAL_FATAL_AND_THROW = 6;
+    private static final int SIGNAL_NON_FATAL_AND_THROW = 7;
+    private static final int SELF_KILL_AND_COMPLETE = 8;
+
+    public static final byte[] CONF_EMPTY = new byte[]{EMPTY};
+    public static final byte[] CONF_THROW_IO_EXCEPTION = new 
byte[]{THROW_IO_EXCEPTION};
+    public static final byte[] CONF_THROW_TEZ_EXCEPTION = new 
byte[]{THROW_TEZ_EXCEPTION};
+    public static final byte[] CONF_SIGNAL_DEPRECATEDFATAL_AND_THROW =
+        new byte[]{SIGNAL_DEPRECATEDFATAL_AND_THROW};
+    public static final byte[] CONF_SIGNAL_DEPRECATEDFATAL_AND_LOOP =
+        new byte[]{SIGNAL_DEPRECATEDFATAL_AND_LOOP};
+    public static final byte[] CONF_SIGNAL_DEPRECATEDFATAL_AND_COMPLETE =
+        new byte[]{SIGNAL_DEPRECATEDFATAL_AND_COMPLETE};
+    public static final byte[] CONF_SIGNAL_FATAL_AND_THROW = new 
byte[]{SIGNAL_FATAL_AND_THROW};
+    public static final byte[] CONF_SIGNAL_NON_FATAL_AND_THROW =
+        new byte[]{SIGNAL_NON_FATAL_AND_THROW};
+    public static final byte[] CONF_SELF_KILL_AND_COMPLETE = new 
byte[]{SELF_KILL_AND_COMPLETE};
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TestProcessor.class);
 
@@ -77,9 +97,12 @@ public class TaskExecutionTestHelpers {
 
     private boolean throwIOException = false;
     private boolean throwTezException = false;
+    private boolean signalDeprecatedFatalAndThrow = false;
+    private boolean signalDeprecatedFatalAndLoop = false;
+    private boolean signalDeprecatedFatalAndComplete = false;
     private boolean signalFatalAndThrow = false;
-    private boolean signalFatalAndLoop = false;
-    private boolean signalFatalAndComplete = false;
+    private boolean signalNonFatalAndThrow = false;
+    private boolean selfKillAndComplete = false;
 
     public TestProcessor(ProcessorContext context) {
       super(context);
@@ -102,11 +125,14 @@ public class TaskExecutionTestHelpers {
 
     private void parseConf(byte[] bytes) {
       byte b = bytes[0];
-      throwIOException = (b & 1) > 0;
-      throwTezException = (b & 2) > 0;
-      signalFatalAndThrow = (b & 4) > 0;
-      signalFatalAndLoop = (b & 8) > 0;
-      signalFatalAndComplete = (b & 16) > 0;
+      throwIOException = (b == THROW_IO_EXCEPTION);
+      throwTezException = (b == THROW_TEZ_EXCEPTION);
+      signalDeprecatedFatalAndThrow = (b == SIGNAL_DEPRECATEDFATAL_AND_THROW);
+      signalDeprecatedFatalAndLoop = (b == SIGNAL_DEPRECATEDFATAL_AND_LOOP);
+      signalDeprecatedFatalAndComplete = (b == 
SIGNAL_DEPRECATEDFATAL_AND_COMPLETE);
+      signalFatalAndThrow = (b == SIGNAL_FATAL_AND_THROW);
+      signalNonFatalAndThrow = (b == SIGNAL_NON_FATAL_AND_THROW);
+      selfKillAndComplete = (b == SELF_KILL_AND_COMPLETE);
     }
 
     public static void reset() {
@@ -191,6 +217,7 @@ public class TaskExecutionTestHelpers {
       wasAborted = true;
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void run(Map<String, LogicalInput> inputs, Map<String, 
LogicalOutput> outputs) throws
         Exception {
@@ -212,23 +239,38 @@ public class TaskExecutionTestHelpers {
             throw createProcessorIOException();
           } else if (throwTezException) {
             throw createProcessorTezException();
-          } else if (signalFatalAndThrow) {
-            IOException io = new IOException("FATALERROR");
-            getContext().fatalError(io, "FATALERROR");
+          } else if (signalDeprecatedFatalAndThrow) {
+            IOException io = new 
IOException(IOException.class.getSimpleName());
+
+            getContext().fatalError(io, IOException.class.getSimpleName());
             throw io;
-          } else if (signalFatalAndComplete) {
-            IOException io = new IOException("FATALERROR");
-            getContext().fatalError(io, "FATALERROR");
+          } else if (signalDeprecatedFatalAndComplete) {
+            IOException io = new 
IOException(IOException.class.getSimpleName());
+            getContext().fatalError(io, IOException.class.getSimpleName());
             return;
-          } else if (signalFatalAndLoop) {
+          } else if (signalDeprecatedFatalAndLoop) {
             IOException io = createProcessorIOException();
-            getContext().fatalError(io, "FATALERROR");
+            getContext().fatalError(io, IOException.class.getSimpleName());
             LOG.info("looping");
             looping = true;
             loopCondition.signal();
             LOG.info("Waiting for Processor signal again");
             processorCondition.await();
             LOG.info("Received second processor signal");
+          } else if (signalFatalAndThrow) {
+            IOException io = new 
IOException(IOException.class.getSimpleName());
+            getContext().reportFailure(TaskFailureType.FATAL, io, 
IOException.class.getSimpleName());
+            LOG.info("throwing");
+            throw io;
+          } else if (signalNonFatalAndThrow) {
+            IOException io = new 
IOException(IOException.class.getSimpleName());
+            getContext().reportFailure(TaskFailureType.NON_FATAL, io, 
IOException.class.getSimpleName());
+            LOG.info("throwing");
+            throw io;
+          } else if (selfKillAndComplete) {
+            LOG.info("Reporting kill self");
+            getContext().killSelf(new 
IOException(IOException.class.getSimpleName()), "SELFKILL");
+            LOG.info("Returning");
           }
         } catch (InterruptedException e) {
           receivedInterrupt = true;
@@ -344,6 +386,10 @@ public class TaskExecutionTestHelpers {
     }
 
     public void verifyTaskFailedEvent(String diagStart, String diagContains) {
+      verifyTaskFailedEvent(diagStart, diagContains, 
TaskFailureType.NON_FATAL);
+    }
+
+    public void verifyTaskFailedEvent(String diagStart, String diagContains, 
TaskFailureType taskFailureType) {
       umbilicalLock.lock();
       try {
         for (TezEvent event : requestEvents) {
@@ -352,6 +398,7 @@ public class TaskExecutionTestHelpers {
             if (failedEvent.getDiagnostics().startsWith(diagStart)) {
               if (diagContains != null) {
                 if (failedEvent.getDiagnostics().contains(diagContains)) {
+                  assertEquals(taskFailureType, 
failedEvent.getTaskFailureType());
                   return;
                 } else {
                   fail("Diagnostic message does not contain expected message. 
Found [" +
@@ -370,6 +417,35 @@ public class TaskExecutionTestHelpers {
       }
     }
 
+    public void verifyTaskKilledEvent(String diagStart, String diagContains) {
+      umbilicalLock.lock();
+      try {
+        for (TezEvent event : requestEvents) {
+          if (event.getEvent() instanceof TaskAttemptKilledEvent) {
+            TaskAttemptKilledEvent killedEvent =
+                (TaskAttemptKilledEvent) event.getEvent();
+            if (killedEvent.getDiagnostics().startsWith(diagStart)) {
+              if (diagContains != null) {
+                if (killedEvent.getDiagnostics().contains(diagContains)) {
+                  return;
+                } else {
+                  fail("Diagnostic message does not contain expected message. 
Found [" +
+                      killedEvent.getDiagnostics() + "], Expected: [" + 
diagContains + "]");
+                }
+              }
+            } else {
+              fail("Diagnostic message does not start with expected message. 
Found [" +
+                  killedEvent.getDiagnostics() + "], Expected: [" + diagStart 
+ "]");
+            }
+          }
+        }
+        fail("No TaskAttemptKilledEvents sent over umbilical");
+      } finally {
+        umbilicalLock.unlock();
+      }
+
+    }
+
     public void verifyTaskSuccessEvent() {
       umbilicalLock.lock();
       try {

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
index 18660f6..c3b9abd 100644
--- 
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
+++ 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
@@ -67,6 +67,7 @@ import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.ObjectRegistry;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.api.impl.InputSpec;
@@ -89,6 +90,9 @@ public class TestTaskExecution2 {
   private static final FileSystem localFs;
   private static final Path workDir;
 
+  private static final String FAILURE_START_STRING = "Error while running task 
( failure )";
+  private static final String KILL_START_STRING = "Error while running task ( 
kill )";
+
   private static final ExecutorService taskExecutor = 
Executors.newFixedThreadPool(1);
 
   static {
@@ -135,7 +139,7 @@ public class TestTaskExecution2 {
       // Signal the processor to go through
       TestProcessor.signal();
       TaskRunner2Result result = taskRunnerFuture.get();
-      verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false);
+      verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false, null);
       assertNull(taskReporter.currentCallable);
       umbilical.verifyTaskSuccessEvent();
       assertFalse(TestProcessor.wasAborted());
@@ -166,7 +170,7 @@ public class TestTaskExecution2 {
       // Signal the processor to go through
       TestProcessor.signal();
       TaskRunner2Result result = taskRunnerFuture.get();
-      verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false);
+      verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false, null);
       assertNull(taskReporter.currentCallable);
       umbilical.verifyTaskSuccessEvent();
       assertFalse(TestProcessor.wasAborted());
@@ -182,7 +186,7 @@ public class TestTaskExecution2 {
       // Signal the processor to go through
       TestProcessor.signal();
       result = taskRunnerFuture.get();
-      verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false);
+      verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false, null);
       assertNull(taskReporter.currentCallable);
       umbilical.verifyTaskSuccessEvent();
       assertFalse(TestProcessor.wasAborted());
@@ -217,11 +221,11 @@ public class TestTaskExecution2 {
       TestProcessor.awaitStart();
       TestProcessor.signal();
       TaskRunner2Result result = taskRunnerFuture.get();
-      verifyTaskRunnerResult(result, EndReason.TASK_ERROR, 
createProcessorTezException(), false);
+      verifyTaskRunnerResult(result, EndReason.TASK_ERROR, 
createProcessorTezException(), false, TaskFailureType.NON_FATAL);
 
       assertNull(taskReporter.currentCallable);
       umbilical.verifyTaskFailedEvent(
-          "Failure while running task",
+          FAILURE_START_STRING,
           TezException.class.getName() + ": " + 
TezException.class.getSimpleName());
       // Failure detected as a result of fall off from the run method. abort 
isn't required.
       assertFalse(TestProcessor.wasAborted());
@@ -254,10 +258,10 @@ public class TestTaskExecution2 {
 
       TaskRunner2Result result = taskRunnerFuture.get();
       verifyTaskRunnerResult(result, EndReason.TASK_ERROR,
-          new TezReflectionException("TezReflectionException"), false);
+          new TezReflectionException("TezReflectionException"), false, 
TaskFailureType.NON_FATAL);
 
       assertNull(taskReporter.currentCallable);
-      umbilical.verifyTaskFailedEvent("Failure while running task",
+      umbilical.verifyTaskFailedEvent(FAILURE_START_STRING,
           ":org.apache.tez.dag.api.TezReflectionException: "
               + "Unable to load class: NotExitedProcessor");
       // Failure detected as a result of fall off from the run method. abort 
isn't required.
@@ -291,12 +295,12 @@ public class TestTaskExecution2 {
       TestProcessor.awaitStart();
       TestProcessor.signal();
       TaskRunner2Result result = taskRunnerFuture.get();
-      verifyTaskRunnerResult(result, EndReason.TASK_ERROR, 
createProcessorIOException(), false);
+      verifyTaskRunnerResult(result, EndReason.TASK_ERROR, 
createProcessorIOException(), false, TaskFailureType.NON_FATAL);
 
 
       assertNull(taskReporter.currentCallable);
       umbilical.verifyTaskFailedEvent(
-          "Failure while running task",
+          FAILURE_START_STRING,
           IOException.class.getName() + ": " + 
IOException.class.getSimpleName());
       // Failure detected as a result of fall off from the run method. abort 
isn't required.
       assertFalse(TestProcessor.wasAborted());
@@ -333,7 +337,7 @@ public class TestTaskExecution2 {
       TaskRunner2Result result = taskRunnerFuture.get();
       verifyTaskRunnerResult(result, EndReason.COMMUNICATION_FAILURE,
           new IOException("IOException"),
-          TaskExecutionTestHelpers.HEARTBEAT_EXCEPTION_STRING, false);
+          TaskExecutionTestHelpers.HEARTBEAT_EXCEPTION_STRING, false, 
TaskFailureType.NON_FATAL);
 
       TestProcessor.awaitCompletion();
       assertTrue(TestProcessor.wasInterrupted());
@@ -371,7 +375,7 @@ public class TestTaskExecution2 {
       // Not signaling an actual start to verify task interruption
 
       TaskRunner2Result result = taskRunnerFuture.get();
-      verifyTaskRunnerResult(result, EndReason.CONTAINER_STOP_REQUESTED, null, 
true);
+      verifyTaskRunnerResult(result, EndReason.CONTAINER_STOP_REQUESTED, null, 
true, null);
 
 
       TestProcessor.awaitCompletion();
@@ -388,7 +392,7 @@ public class TestTaskExecution2 {
   }
 
   @Test(timeout = 5000)
-  public void testSignalFatalErrorAndLoop() throws IOException, 
InterruptedException, TezException,
+  public void testSignalDeprecatedFatalErrorAndLoop() throws IOException, 
InterruptedException, TezException,
       ExecutionException {
 
     ListeningExecutorService executor = null;
@@ -401,7 +405,7 @@ public class TestTaskExecution2 {
       TaskReporter taskReporter = createTaskReporter(appId, umbilical);
 
       TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, 
taskReporter, executor,
-          TestProcessor.CONF_SIGNAL_FATAL_AND_LOOP);
+          TestProcessor.CONF_SIGNAL_DEPRECATEDFATAL_AND_LOOP);
       // Setup the executor
       Future<TaskRunner2Result> taskRunnerFuture =
           taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
@@ -413,13 +417,13 @@ public class TestTaskExecution2 {
       // The fatal error should have caused an interrupt.
 
       TaskRunner2Result result = taskRunnerFuture.get();
-      verifyTaskRunnerResult(result, EndReason.TASK_ERROR, 
createProcessorIOException(), false);
+      verifyTaskRunnerResult(result, EndReason.TASK_ERROR, 
createProcessorIOException(), false, TaskFailureType.NON_FATAL);
 
       TestProcessor.awaitCompletion();
       assertTrue(TestProcessor.wasInterrupted());
       assertNull(taskReporter.currentCallable);
       umbilical.verifyTaskFailedEvent(
-          "Failure while running task",
+          FAILURE_START_STRING,
           IOException.class.getName() + ": " + 
IOException.class.getSimpleName());
       // Signal fatal error should cause the processor to fail.
       assertTrue(TestProcessor.wasAborted());
@@ -429,6 +433,115 @@ public class TestTaskExecution2 {
   }
 
   @Test(timeout = 5000)
+  public void testSignalFatalAndThrow() throws IOException, 
InterruptedException, TezException,
+      ExecutionException {
+
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+          umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+      TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, 
taskReporter, executor,
+          TestProcessor.CONF_SIGNAL_FATAL_AND_THROW);
+      // Setup the executor
+      Future<TaskRunner2Result> taskRunnerFuture =
+          taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+      // Signal the processor to go through
+      TestProcessor.awaitStart();
+      TestProcessor.signal();
+
+      TaskRunner2Result result = taskRunnerFuture.get();
+      verifyTaskRunnerResult(result, EndReason.TASK_ERROR, 
createProcessorIOException(), false, TaskFailureType.FATAL);
+
+      TestProcessor.awaitCompletion();
+      assertNull(taskReporter.currentCallable);
+      umbilical.verifyTaskFailedEvent(
+          FAILURE_START_STRING,
+          IOException.class.getName() + ": " + 
IOException.class.getSimpleName(), TaskFailureType.FATAL);
+      assertTrue(TestProcessor.wasAborted());
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testSignalNonFatalAndThrow() throws IOException, 
InterruptedException, TezException,
+      ExecutionException {
+
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+          umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+      TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, 
taskReporter, executor,
+          TestProcessor.CONF_SIGNAL_NON_FATAL_AND_THROW);
+      // Setup the executor
+      Future<TaskRunner2Result> taskRunnerFuture =
+          taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+      // Signal the processor to go through
+      TestProcessor.awaitStart();
+      TestProcessor.signal();
+
+      TaskRunner2Result result = taskRunnerFuture.get();
+      verifyTaskRunnerResult(result, EndReason.TASK_ERROR, 
createProcessorIOException(), false, TaskFailureType.NON_FATAL);
+
+      TestProcessor.awaitCompletion();
+      assertNull(taskReporter.currentCallable);
+      umbilical.verifyTaskFailedEvent(
+          FAILURE_START_STRING,
+          IOException.class.getName() + ": " + 
IOException.class.getSimpleName(), TaskFailureType.NON_FATAL);
+      assertTrue(TestProcessor.wasAborted());
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testTaskSelfKill() throws IOException, InterruptedException, 
TezException,
+      ExecutionException {
+
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+          umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+      TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, 
taskReporter, executor,
+          TestProcessor.CONF_SELF_KILL_AND_COMPLETE);
+      // Setup the executor
+      Future<TaskRunner2Result> taskRunnerFuture =
+          taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+      // Signal the processor to go through
+      TestProcessor.awaitStart();
+      TestProcessor.signal();
+
+      TaskRunner2Result result = taskRunnerFuture.get();
+      verifyTaskRunnerResult(result, EndReason.TASK_KILL_REQUEST, 
createProcessorIOException(), false,
+          null);
+
+      TestProcessor.awaitCompletion();
+      assertNull(taskReporter.currentCallable);
+      umbilical.verifyTaskKilledEvent(
+          KILL_START_STRING,
+          IOException.class.getName() + ": " + 
IOException.class.getSimpleName());
+      assertTrue(TestProcessor.wasAborted());
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  @Test(timeout = 5000)
   public void testTaskKilled() throws IOException, InterruptedException, 
TezException,
       ExecutionException {
 
@@ -452,7 +565,7 @@ public class TestTaskExecution2 {
       taskRunner.killTask();
 
       TaskRunner2Result result = taskRunnerFuture.get();
-      verifyTaskRunnerResult(result, EndReason.KILL_REQUESTED, null, false);
+      verifyTaskRunnerResult(result, EndReason.KILL_REQUESTED, null, false, 
null);
 
       TestProcessor.awaitCompletion();
       assertTrue(TestProcessor.wasInterrupted());
@@ -492,7 +605,7 @@ public class TestTaskExecution2 {
 
       taskRunner.killTask();
       TaskRunner2Result result = taskRunnerFuture.get();
-      verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false);
+      verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false, null);
 
       assertFalse(TestProcessor.wasInterrupted());
       assertNull(taskReporter.currentCallable);
@@ -534,15 +647,17 @@ public class TestTaskExecution2 {
 
   private void verifyTaskRunnerResult(TaskRunner2Result taskRunner2Result,
                                       EndReason expectedEndReason, Throwable 
expectedThrowable,
-                                      boolean wasShutdownRequested) {
+                                      boolean wasShutdownRequested,
+                                      TaskFailureType taskFailureType) {
     verifyTaskRunnerResult(taskRunner2Result, expectedEndReason, 
expectedThrowable, null,
-        wasShutdownRequested);
+        wasShutdownRequested, taskFailureType);
   }
 
   private void verifyTaskRunnerResult(TaskRunner2Result taskRunner2Result,
                                       EndReason expectedEndReason, Throwable 
expectedThrowable,
                                       String expectedExceptionMessage,
-                                      boolean wasShutdownRequested) {
+                                      boolean wasShutdownRequested,
+                                      TaskFailureType taskFailureType) {
     assertEquals(expectedEndReason, taskRunner2Result.getEndReason());
     if (expectedThrowable == null) {
       assertNull(taskRunner2Result.getError());
@@ -557,6 +672,7 @@ public class TestTaskExecution2 {
       }
 
     }
+    assertEquals(taskFailureType, taskRunner2Result.getTaskFailureType());
     assertEquals(wasShutdownRequested, 
taskRunner2Result.isContainerShutdownRequested());
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 7f2054b..b82098e 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -46,6 +46,7 @@ import javax.crypto.SecretKey;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.tez.http.HttpConnectionParams;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -748,7 +749,7 @@ public class ShuffleManager implements FetcherCallback {
 
   private void reportFatalError(Throwable exception, String message) {
     LOG.error(message);
-    inputContext.fatalError(exception, message);
+    inputContext.reportFailure(TaskFailureType.NON_FATAL, exception, message);
   }
 
   @Override
@@ -931,7 +932,7 @@ public class ShuffleManager implements FetcherCallback {
         }
       } else {
         LOG.error(srcNameTrimmed + ": " + "Scheduler failed with error: ", t);
-        inputContext.fatalError(t, "Shuffle Scheduler Failed");
+        inputContext.reportFailure(TaskFailureType.NON_FATAL, t, "Shuffle 
Scheduler Failed");
       }
     }
     
@@ -988,7 +989,7 @@ public class ShuffleManager implements FetcherCallback {
       } else {
         LOG.error(srcNameTrimmed + ": " + "Fetcher failed with error: ", t);
         shuffleError = t;
-        inputContext.fatalError(t, "Fetch failed");
+        inputContext.reportFailure(TaskFailureType.NON_FATAL, t, "Fetch 
failed");
         doBookKeepingForFetcherComplete();
       }
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index f40c49a..37269ad 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -423,7 +424,7 @@ public class Shuffle implements ExceptionReporter {
       } else {
         LOG.error(srcNameTrimmed + ": " + "ShuffleRunner failed with error", 
t);
         // In case of an abort / Interrupt - the runtime makes sure that this 
is ignored.
-        inputContext.fatalError(t, "Shuffle Runner Failed");
+        inputContext.reportFailure(TaskFailureType.NON_FATAL, t, "Shuffle 
Runner Failed");
         cleanupIgnoreErrors();
       }
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index ce410be..c7e3059 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -52,6 +52,7 @@ import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
@@ -921,7 +922,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
       outputContext.sendEvents(Collections.singletonList(compEvent));
     } catch (IOException e) {
       LOG.error(destNameTrimmed + ": " + "Error in sending pipelined events", 
e);
-      outputContext.fatalError(e, "Error in sending pipelined events");
+      outputContext.reportFailure(TaskFailureType.NON_FATAL, e, "Error in 
sending pipelined events");
     }
   }
 
@@ -950,7 +951,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
 
       } catch (Throwable e) {
         LOG.error(destNameTrimmed + ": " + "Failure while attempting to reset 
buffer after spill", e);
-        outputContext.fatalError(e, "Failure while attempting to reset buffer 
after spill");
+        outputContext.reportFailure(TaskFailureType.NON_FATAL, e, "Failure 
while attempting to reset buffer after spill");
       }
 
       if (!pipelinedShuffle) {
@@ -976,7 +977,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
       // Consider removing it in favor of having Tez kill the task
       LOG.error(destNameTrimmed + ": " + "Failure while spilling to disk", t);
       spillException = t;
-      outputContext.fatalError(t, "Failure while spilling to disk");
+      outputContext.reportFailure(TaskFailureType.NON_FATAL, t, "Failure while 
spilling to disk");
       spillLock.lock();
       try {
         spillInProgress.signal();

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
index 5bbf0fb..0294bd3 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
@@ -20,7 +20,6 @@ package org.apache.tez.runtime.library.common.shuffle.impl;
 
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
@@ -29,7 +28,6 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
@@ -50,6 +48,7 @@ import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
@@ -229,7 +228,7 @@ public class TestShuffleInputEventHandlerImpl {
     //0--> 1 with spill id 1 (attemptNum 1).  This should report exception
     dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 1);
     handler.handleEvents(Collections.singletonList(dme));
-    verify(inputContext).fatalError(any(Throwable.class), anyString());
+    verify(inputContext).reportFailure(any(TaskFailureType.class), 
any(Throwable.class), anyString());
   }
 
   /**
@@ -258,7 +257,7 @@ public class TestShuffleInputEventHandlerImpl {
     //Now send attemptNum 0.  This should throw exception, because attempt #1 
is already added
     dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0);
     handler.handleEvents(Collections.singletonList(dme));
-    verify(inputContext).fatalError(any(Throwable.class), anyString());
+    verify(inputContext).reportFailure(any(TaskFailureType.class), 
any(Throwable.class), anyString());
   }
 
   /**
@@ -297,7 +296,7 @@ public class TestShuffleInputEventHandlerImpl {
     //Now send attemptNum 1.  This should throw exception, because attempt #1 
is already added
     dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1);
     handler.handleEvents(Collections.singletonList(dme));
-    verify(inputContext).fatalError(any(Throwable.class), anyString());
+    verify(inputContext).reportFailure(any(TaskFailureType.class), 
any(Throwable.class), anyString());
   }
 
   private Event createDataMovementEvent(boolean addSpillDetails, int srcIdx, 
int targetIdx,

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java
index 28f813c..855aedf 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java
@@ -17,6 +17,7 @@ package 
org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -35,6 +36,7 @@ import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.common.Constants;
@@ -76,7 +78,7 @@ public class TestShuffle {
 
       ArgumentCaptor<Throwable> throwableArgumentCaptor = 
ArgumentCaptor.forClass(Throwable.class);
       ArgumentCaptor<String> stringArgumentCaptor = 
ArgumentCaptor.forClass(String.class);
-      verify(inputContext, 
times(1)).fatalError(throwableArgumentCaptor.capture(),
+      verify(inputContext, 
times(1)).reportFailure(eq(TaskFailureType.NON_FATAL), 
throwableArgumentCaptor.capture(),
           stringArgumentCaptor.capture());
 
       Throwable t = throwableArgumentCaptor.getValue();

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index e7a2125..8c935eb 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -45,6 +45,7 @@ import java.util.Set;
 import java.util.UUID;
 
 import com.google.protobuf.ByteString;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configurable;
@@ -325,7 +326,7 @@ public class TestUnorderedPartitionedKVWriter {
     }
 
     List<Event> events = kvWriter.close();
-    verify(outputContext, never()).fatalError(any(Throwable.class), 
any(String.class));
+    verify(outputContext, never()).reportFailure(any(TaskFailureType.class), 
any(Throwable.class), any(String.class));
 
     TezCounter outputLargeRecordsCounter = 
counters.findCounter(TaskCounter.OUTPUT_LARGE_RECORDS);
     assertEquals(numLargeKeys + numLargevalues + numLargeKvPairs,
@@ -493,7 +494,7 @@ public class TestUnorderedPartitionedKVWriter {
 
     assertTrue(events.size() == 1); //the last event which was sent out
 
-    verify(outputContext, never()).fatalError(any(Throwable.class), 
any(String.class));
+    verify(outputContext, never()).reportFailure(any(TaskFailureType.class), 
any(Throwable.class), any(String.class));
 
     // Verify the status of the buffers
     if (numExpectedSpills == 0) {
@@ -651,7 +652,7 @@ public class TestUnorderedPartitionedKVWriter {
     int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead;
     int numExpectedSpills = numRecordsWritten / recordsPerBuffer;
 
-    verify(outputContext, never()).fatalError(any(Throwable.class), 
any(String.class));
+    verify(outputContext, never()).reportFailure(any(TaskFailureType.class), 
any(Throwable.class), any(String.class));
 
     // Verify the status of the buffers
     if (numExpectedSpills == 0) {

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
 
b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
index 884808e..6103047 100644
--- 
a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
+++ 
b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
@@ -21,6 +21,7 @@ package org.apache.tez.mapreduce.examples.processor;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -57,7 +58,7 @@ public class FilterByWordInputProcessor extends 
AbstractLogicalIOProcessor {
     Configuration conf = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
     filterWord = conf.get(FilterLinesByWord.FILTER_PARAM_NAME);
     if (filterWord == null) {
-      getContext().fatalError(null, "No filter word specified");
+      getContext().reportFailure(TaskFailureType.NON_FATAL, null, "No filter 
word specified");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java 
b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index a4f3c27..811ca3c 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -284,7 +285,7 @@ public class TestInput extends AbstractLogicalInput {
   
   void throwException(String msg) {
     RuntimeException e = new RuntimeException(msg);
-    getContext().fatalError(e , msg);
+    getContext().reportFailure(TaskFailureType.NON_FATAL, e , msg);
     throw e;
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java 
b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
index 43777bc..b53dad9 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -119,7 +120,7 @@ public class TestProcessor extends 
AbstractLogicalIOProcessor {
 
   void throwException(String msg) {
     RuntimeException e = new RuntimeException(msg);
-    getContext().fatalError(e , msg);
+    getContext().reportFailure(TaskFailureType.NON_FATAL, e , msg);
     throw e;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-tests/src/test/java/org/apache/tez/test/TestTaskErrorsUsingLocalMode.java
----------------------------------------------------------------------
diff --git 
a/tez-tests/src/test/java/org/apache/tez/test/TestTaskErrorsUsingLocalMode.java 
b/tez-tests/src/test/java/org/apache/tez/test/TestTaskErrorsUsingLocalMode.java
new file mode 100644
index 0000000..f413bdd
--- /dev/null
+++ 
b/tez-tests/src/test/java/org/apache/tez/test/TestTaskErrorsUsingLocalMode.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskFailureType;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestTaskErrorsUsingLocalMode {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestTaskErrorsUsingLocalMode.class);
+
+  private static final String VERTEX_NAME = "vertex1";
+
+
+  @Test(timeout = 20000)
+  public void testFatalErrorReported() throws IOException, TezException, 
InterruptedException {
+
+    TezClient tezClient = getTezClient("testFatalErrorReported");
+    DAGClient dagClient = null;
+
+    try {
+      FailingProcessor.configureForFatalFail();
+      DAG dag = DAG.create("testFatalErrorReportedDag").addVertex(
+          Vertex
+              .create(VERTEX_NAME, 
ProcessorDescriptor.create(FailingProcessor.class.getName()), 1));
+
+      dagClient = tezClient.submitDAG(dag);
+      dagClient.waitForCompletion();
+      assertEquals(DAGStatus.State.FAILED, 
dagClient.getDAGStatus(null).getState());
+      assertEquals(1, dagClient.getVertexStatus(VERTEX_NAME, 
null).getProgress().getFailedTaskAttemptCount());
+    } finally {
+      if (dagClient != null) {
+        dagClient.close();
+      }
+      tezClient.stop();
+    }
+  }
+
+  @Test(timeout = 20000)
+  public void testNonFatalErrorReported() throws IOException, TezException, 
InterruptedException {
+
+    TezClient tezClient = getTezClient("testNonFatalErrorReported");
+    DAGClient dagClient = null;
+
+    try {
+      FailingProcessor.configureForNonFatalFail();
+      DAG dag = DAG.create("testNonFatalErrorReported").addVertex(
+          Vertex
+              .create(VERTEX_NAME, 
ProcessorDescriptor.create(FailingProcessor.class.getName()), 1));
+
+      dagClient = tezClient.submitDAG(dag);
+      dagClient.waitForCompletion();
+      assertEquals(DAGStatus.State.FAILED, 
dagClient.getDAGStatus(null).getState());
+      assertEquals(4, dagClient.getVertexStatus(VERTEX_NAME, 
null).getProgress().getFailedTaskAttemptCount());
+    } finally {
+      if (dagClient != null) {
+        dagClient.close();
+      }
+      tezClient.stop();
+    }
+  }
+
+  @Test(timeout = 20000)
+  public void testSelfKillReported() throws IOException, TezException, 
InterruptedException {
+
+    TezClient tezClient = getTezClient("testSelfKillReported");
+    DAGClient dagClient = null;
+
+    try {
+      FailingProcessor.configureForKilled(10);
+      DAG dag = DAG.create("testSelfKillReported").addVertex(
+          Vertex
+              .create(VERTEX_NAME, 
ProcessorDescriptor.create(FailingProcessor.class.getName()), 1));
+
+      dagClient = tezClient.submitDAG(dag);
+      dagClient.waitForCompletion();
+      assertEquals(DAGStatus.State.SUCCEEDED, 
dagClient.getDAGStatus(null).getState());
+      assertEquals(10, dagClient.getVertexStatus(VERTEX_NAME, 
null).getProgress().getKilledTaskAttemptCount());
+    } finally {
+      if (dagClient != null) {
+        dagClient.close();
+      }
+      tezClient.stop();
+    }
+  }
+
+
+  private TezClient getTezClient(String name) throws IOException, TezException 
{
+    TezConfiguration tezConf1 = new TezConfiguration();
+    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    tezConf1.set("fs.defaultFS", "file:///");
+    
tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, 
true);
+    TezClient tezClient1 = TezClient.create("commonName", tezConf1, true);
+    tezClient1.start();
+    return tezClient1;
+  }
+
+
+  public static class FailingProcessor extends AbstractLogicalIOProcessor {
+
+    private static final String FAIL_STRING_NON_FATAL = "non-fatal-fail";
+    private static final String FAIL_STRING_FATAL = "fatal-fail";
+    private static final String KILL_STRING = "kill-self";
+
+    private static volatile boolean shouldFail;
+    private static volatile boolean fatalError;
+
+    private static volatile boolean shouldKill;
+    private static volatile int killModeAttemptNumberToSucceed;
+
+
+    static {
+      reset();
+    }
+
+    static void reset() {
+      shouldFail = false;
+      fatalError = false;
+
+      shouldKill = false;
+      killModeAttemptNumberToSucceed = -1;
+    }
+
+    static void configureForNonFatalFail() {
+      reset();
+      shouldFail = true;
+    }
+
+    static void configureForFatalFail() {
+      reset();
+      shouldFail = true;
+      fatalError = true;
+    }
+
+    static void configureForKilled(int attemptNumber) {
+      reset();
+      shouldKill = true;
+      killModeAttemptNumberToSucceed = attemptNumber;
+    }
+
+    public FailingProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void initialize() throws Exception {
+
+    }
+
+    @Override
+    public void handleEvents(List<Event> processorEvents) {
+
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+
+    @Override
+    public void run(Map<String, LogicalInput> inputs, Map<String, 
LogicalOutput> outputs) throws
+        Exception {
+      LOG.info("Running Failing processor");
+      if (shouldFail) {
+        if (fatalError) {
+          LOG.info("Reporting fatal error");
+          getContext().reportFailure(TaskFailureType.FATAL, null, 
FAIL_STRING_FATAL);
+        } else {
+          LOG.info("Reporting non-fatal error");
+          getContext().reportFailure(TaskFailureType.NON_FATAL, null, 
FAIL_STRING_NON_FATAL);
+        }
+      } else if (shouldKill) {
+        if (getContext().getTaskAttemptNumber() != 
killModeAttemptNumberToSucceed) {
+          LOG.info("Reporting self-kill for attempt=" + 
getContext().getTaskAttemptNumber());
+          getContext().killSelf(null, KILL_STRING);
+        }
+      }
+    }
+  }
+
+}

Reply via email to