TEZ-1773. Add attempt failure cause enum to the attempt failed/killed history 
record (bikaS)
(cherry picked from commit 81eef37d9e1e9222ef09eed319c45cdcd9034cd8)

Conflicts:
        CHANGES.txt
        
tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
        
tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
        
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
        tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
        tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
        
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
        tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
        
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/84a7ef05
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/84a7ef05
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/84a7ef05

Branch: refs/heads/branch-0.5
Commit: 84a7ef0516cdcc0cd5a4d480ea6f235de4ad9db9
Parents: 3e5991a
Author: Bikas Saha <[email protected]>
Authored: Mon Nov 24 11:15:44 2014 -0800
Committer: Bikas Saha <[email protected]>
Committed: Tue Sep 22 14:37:17 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/tez/common/ATSConstants.java     |   1 +
 .../records/TaskAttemptTerminationCause.java    |  45 ++++
 .../tez/dag/app/TaskHeartbeatHandler.java       |   3 +-
 .../org/apache/tez/dag/app/dag/TaskAttempt.java |   2 +
 .../event/TaskAttemptEventAttemptFailed.java    |  13 +-
 .../TaskAttemptEventContainerTerminated.java    |  13 +-
 ...AttemptEventContainerTerminatedBySystem.java |  13 +-
 .../TaskAttemptEventContainerTerminating.java   |  12 +-
 .../dag/event/TaskAttemptEventKillRequest.java  |  13 +-
 .../dag/event/TaskAttemptEventNodeFailed.java   |  12 +-
 .../dag/event/TaskAttemptEventOutputFailed.java |   9 +-
 .../TaskAttemptEventTerminationCauseEvent.java  |  26 +++
 .../dag/app/dag/event/TaskAttemptEventType.java |   3 +-
 .../dag/app/dag/event/TaskEventTermination.java |  28 ++-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  39 +++-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  33 ++-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  36 ++-
 .../app/launcher/LocalContainerLauncher.java    |   9 +-
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  14 +-
 .../rm/container/AMContainerEventCompleted.java |  13 +-
 .../dag/app/rm/container/AMContainerImpl.java   |  96 ++++----
 .../events/TaskAttemptFinishedEvent.java        |  20 +-
 .../impl/HistoryEventJsonConversion.java        |   5 +-
 tez-dag/src/main/proto/HistoryEvents.proto      |   1 +
 .../org/apache/tez/dag/app/TestPreemption.java  |   2 +
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |  43 +++-
 .../app/dag/impl/TestTaskAttemptRecovery.java   |  15 +-
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |  15 +-
 .../tez/dag/app/dag/impl/TestTaskRecovery.java  |  59 ++---
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 133 +++++++++--
 .../app/rm/TestTaskSchedulerEventHandler.java   |  30 +++
 .../dag/app/rm/container/TestAMContainer.java   | 230 ++++++++++++++-----
 .../TestHistoryEventsProtoConversion.java       |   8 +-
 .../impl/TestHistoryEventJsonConversion.java    |   3 +-
 .../ats/HistoryEventTimelineConversion.java     |   3 +
 .../ats/TestHistoryEventTimelineConversion.java |   8 +-
 37 files changed, 777 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0646dfc..cbc6751 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,8 @@ INCOMPATIBLE CHANGES
   TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
 
 ALL CHANGES:
+  TEZ-1773. Add attempt failure cause enum to the attempt failed/killed
+  history record
   TEZ-2203. Intern strings in tez counters
   TEZ-2834. Make Tez preemption resilient to incorrect free resource reported
   by YARN

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java 
b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index aec6ca5..944ae87 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -63,6 +63,7 @@ public class ATSConstants {
   public static final String FINISH_TIME = "endTime";
   public static final String TIME_TAKEN = "timeTaken";
   public static final String STATUS = "status";
+  public static final String TASK_ATTEMPT_ERROR_ENUM = "taskAttemptErrorEnum";
   public static final String DIAGNOSTICS = "diagnostics";
   public static final String SUCCESSFUL_ATTEMPT_ID = "successfulAttemptId";
   public static final String COUNTERS = "counters";

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
----------------------------------------------------------------------
diff --git 
a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
 
b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
new file mode 100644
index 0000000..ef0bb33
--- /dev/null
+++ 
b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+public enum TaskAttemptTerminationCause {
+  UNKNOWN_ERROR, // The error cause is unknown. Usually means a gap in error 
propagation
+  
+  TERMINATED_BY_CLIENT, // Killed by client command
+  TERMINATED_AT_SHUTDOWN, // Killed due execution shutdown
+  INTERNAL_PREEMPTION, // Killed by Tez to makes space for higher pri work
+  EXTERNAL_PREEMPTION, // Killed by the cluster to make space for other work
+  TERMINATED_INEFFECTIVE_SPECULATION, // Killed speculative attempt because 
original succeeded
+  TERMINATED_EFFECTIVE_SPECULATION, // Killed original attempt because 
speculation succeeded
+  TERMINATED_ORPHANED, // Attempt is no longer needed by the task
+  
+  APPLICATION_ERROR, // Failed due to application code error
+  FRAMEWORK_ERROR, // Failed due to code error in Tez code
+  INPUT_READ_ERROR, // Failed due to error in reading inputs
+  OUTPUT_WRITE_ERROR, // Failed due to error in writing outputs
+  OUTPUT_LOST, // Failed because attempts output were reported lost
+  TASK_HEARTBEAT_ERROR, // Failed because AM lost connection to the task
+  
+  CONTAINER_LAUNCH_FAILED, // Failed to launch container
+  CONTAINER_EXITED, // Container exited. Indicates gap in specific error 
propagation from the cluster
+  CONTAINER_STOPPED, // Container stopped or released by Tez
+  NODE_FAILED, // Node for the container failed
+  NODE_DISK_ERROR, // Disk failed on the node runnign the task
+  
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
index 6b698aa..d115b14 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 
@@ -60,6 +61,6 @@ public class TaskHeartbeatHandler extends 
HeartbeatHandlerBase<TezTaskAttemptID>
   protected void handleTimeOut(TezTaskAttemptID attemptId) {
     eventHandler.handle(new TaskAttemptEventAttemptFailed(attemptId,
         TaskAttemptEventType.TA_TIMED_OUT, "AttemptID:" + attemptId.toString()
-        + " Timed out after " + timeOut / 1000 + " secs"));
+        + " Timed out after " + timeOut / 1000 + " secs", 
TaskAttemptTerminationCause.TASK_HEARTBEAT_ERROR));
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index f30fc5c..246324d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -29,6 +29,7 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -69,6 +70,7 @@ public interface TaskAttempt {
   
   TaskAttemptReport getReport();
   List<String> getDiagnostics();
+  TaskAttemptTerminationCause getTerminationCause();
   TezCounters getCounters();
   float getProgress();
   TaskAttemptState getState();

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
index 5c7b956..b9c1d09 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
@@ -18,16 +18,19 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 public class TaskAttemptEventAttemptFailed extends TaskAttemptEvent 
-  implements DiagnosableEvent {
+  implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
 
   private final String diagnostics;
+  private final TaskAttemptTerminationCause errorCause;
   public TaskAttemptEventAttemptFailed(TezTaskAttemptID id,
-      TaskAttemptEventType type, String diagnostics) {
+      TaskAttemptEventType type, String diagnostics, 
TaskAttemptTerminationCause errorCause) {
     super(id, type);
     this.diagnostics = diagnostics;
+    this.errorCause = errorCause;
   }
 
   @Override
@@ -35,5 +38,9 @@ public class TaskAttemptEventAttemptFailed extends 
TaskAttemptEvent
     return diagnostics;
   }
   
-  
+  @Override
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return errorCause;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
index 87aa313..5dd0141 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
@@ -17,20 +17,29 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 public class TaskAttemptEventContainerTerminated extends TaskAttemptEvent
-    implements DiagnosableEvent {
+    implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
 
   private final String message;
+  private final TaskAttemptTerminationCause errorCause;
 
-  public TaskAttemptEventContainerTerminated(TezTaskAttemptID id, String 
message) {
+  public TaskAttemptEventContainerTerminated(TezTaskAttemptID id, String 
message, 
+      TaskAttemptTerminationCause errCause) {
     super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED);
     this.message = message;
+    this.errorCause = errCause;
   }
 
   @Override
   public String getDiagnosticInfo() {
     return message;
   }
+
+  @Override
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return errorCause;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
index a92aafd..a3c57e4 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
@@ -18,19 +18,28 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 public class TaskAttemptEventContainerTerminatedBySystem extends 
TaskAttemptEvent 
-  implements DiagnosableEvent {
+  implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
 
   private final String diagnostics;
-  public TaskAttemptEventContainerTerminatedBySystem(TezTaskAttemptID id, 
String diagnostics) {
+  private final TaskAttemptTerminationCause errorCause;
+  public TaskAttemptEventContainerTerminatedBySystem(TezTaskAttemptID id, 
String diagnostics,
+      TaskAttemptTerminationCause errorCause) {
     super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
     this.diagnostics = diagnostics;
+    this.errorCause = errorCause;
   }
 
   @Override
   public String getDiagnosticInfo() {
     return diagnostics;
   }
+
+  @Override
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return errorCause;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
index 7da6e14..02d04a5 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
@@ -17,17 +17,20 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 public class TaskAttemptEventContainerTerminating extends TaskAttemptEvent
-    implements DiagnosableEvent {
+    implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
 
   private final String message;
+  private final TaskAttemptTerminationCause errorCause;
 
   public TaskAttemptEventContainerTerminating(TezTaskAttemptID id,
-      String diagMessage) {
+      String diagMessage, TaskAttemptTerminationCause errCause) {
     super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATING);
     this.message = diagMessage;
+    this.errorCause = errCause;
   }
 
   @Override
@@ -35,4 +38,9 @@ public class TaskAttemptEventContainerTerminating extends 
TaskAttemptEvent
     return this.message;
   }
 
+  @Override
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return errorCause;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
index 9bceb1d..985becd 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
@@ -17,19 +17,28 @@
 */
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
-public class TaskAttemptEventKillRequest extends TaskAttemptEvent {
+public class TaskAttemptEventKillRequest extends TaskAttemptEvent 
+  implements TaskAttemptEventTerminationCauseEvent {
 
   private final String message;
+  private final TaskAttemptTerminationCause errorCause;
 
-  public TaskAttemptEventKillRequest(TezTaskAttemptID id, String message) {
+  public TaskAttemptEventKillRequest(TezTaskAttemptID id, String message, 
TaskAttemptTerminationCause err) {
     super(id, TaskAttemptEventType.TA_KILL_REQUEST);
     this.message = message;
+    this.errorCause = err;
   }
 
   public String getMessage() {
     return this.message;
   }
 
+  @Override
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return errorCause;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
index 6d97466..541ef00 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
@@ -17,17 +17,20 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 public class TaskAttemptEventNodeFailed extends TaskAttemptEvent 
-  implements DiagnosableEvent{
+  implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
 
   private final String message;
+  private final TaskAttemptTerminationCause errorCause;
 
   public TaskAttemptEventNodeFailed(TezTaskAttemptID id,
-      String diagMessage) {
+      String diagMessage, TaskAttemptTerminationCause errorCause) {
     super(id, TaskAttemptEventType.TA_NODE_FAILED);
     this.message = diagMessage;
+    this.errorCause = errorCause;
   }
 
   @Override
@@ -35,4 +38,9 @@ public class TaskAttemptEventNodeFailed extends 
TaskAttemptEvent
     return this.message;
   }
 
+  @Override
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return errorCause;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
index 678e1e7..6bc110a 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
@@ -18,10 +18,12 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TezEvent;
 
-public class TaskAttemptEventOutputFailed extends TaskAttemptEvent {
+public class TaskAttemptEventOutputFailed extends TaskAttemptEvent 
+  implements TaskAttemptEventTerminationCauseEvent {
   
   private TezEvent inputFailedEvent;
   private int consumerTaskNumber;
@@ -40,5 +42,10 @@ public class TaskAttemptEventOutputFailed extends 
TaskAttemptEvent {
   public int getConsumerTaskNumber() {
     return consumerTaskNumber;
   }
+
+  @Override
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return TaskAttemptTerminationCause.OUTPUT_LOST;
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java
new file mode 100644
index 0000000..70c20e3
--- /dev/null
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java
@@ -0,0 +1,26 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
+
+public interface TaskAttemptEventTerminationCauseEvent {
+
+  public TaskAttemptTerminationCause getTerminationCause();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index e7db8d1..cae0b7d 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -29,9 +29,8 @@ public enum TaskAttemptEventType {
 //Producer: TaskAttemptListener
   TA_STARTED_REMOTELY,
   TA_STATUS_UPDATE,
-  TA_DIAGNOSTICS_UPDATE,
   TA_OUTPUT_CONSUMABLE,  // TODO History event to indicate this ?
-  TA_COMMIT_PENDING,
+  TA_DIAGNOSTICS_UPDATE, // REMOVE THIS - UNUSED
   TA_DONE,
   TA_FAILED,
   TA_TIMED_OUT,

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
index 73d5744..d48a0bf 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
@@ -18,22 +18,23 @@
 
 package org.apache.tez.dag.app.dag.event;
 
-import org.apache.tez.dag.app.dag.TaskTerminationCause;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskID;
 
-public class TaskEventTermination extends TaskEvent implements 
DiagnosableEvent{
+public class TaskEventTermination extends TaskEvent implements 
DiagnosableEvent,
+    TaskAttemptEventTerminationCauseEvent {
 
-  private TaskTerminationCause terminationCause;
-  private String diagnostics;
+  private final String diagnostics;
+  private final TaskAttemptTerminationCause errorCause;
   
-  public TaskEventTermination(TezTaskID taskID, TaskTerminationCause 
terminationCause) {
+  public TaskEventTermination(TezTaskID taskID, TaskAttemptTerminationCause 
errorCause, String diagnostics) {
     super(taskID, TaskEventType.T_TERMINATE);
-    this.terminationCause = terminationCause;
-    this.diagnostics = "Task is terminated due to:" + terminationCause.name();
-  }
-
-  public TaskTerminationCause getTerminationCause() {
-    return terminationCause;
+    this.errorCause = errorCause;
+    if (diagnostics != null) {
+      this.diagnostics = diagnostics;
+    } else {
+      this.diagnostics = "Task is terminated due to: " + errorCause.name();
+    }
   }
 
   @Override
@@ -41,4 +42,9 @@ public class TaskEventTermination extends TaskEvent 
implements DiagnosableEvent{
     return diagnostics;
   }
 
+  @Override
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return errorCause;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 1df07f8..f1bfefe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -76,6 +76,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
@@ -90,6 +91,7 @@ import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -124,6 +126,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   protected EventHandler eventHandler;
   private final TezTaskAttemptID attemptId;
   private final Clock clock;
+  private TaskAttemptTerminationCause terminationCause = 
TaskAttemptTerminationCause.UNKNOWN_ERROR;
   private final List<String> diagnostics = new ArrayList<String>();
   private final Lock readLock;
   private final Lock writeLock;
@@ -348,7 +351,6 @@ public class TaskAttemptImpl implements TaskAttempt,
               TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
               TaskAttemptEventType.TA_STATUS_UPDATE,
               TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
-              TaskAttemptEventType.TA_COMMIT_PENDING,
               TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
               TaskAttemptEventType.TA_TIMED_OUT,
               TaskAttemptEventType.TA_KILL_REQUEST,
@@ -371,7 +373,6 @@ public class TaskAttemptImpl implements TaskAttempt,
               TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
               TaskAttemptEventType.TA_STATUS_UPDATE,
               TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
-              TaskAttemptEventType.TA_COMMIT_PENDING,
               TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
               TaskAttemptEventType.TA_TIMED_OUT,
               TaskAttemptEventType.TA_KILL_REQUEST,
@@ -391,7 +392,6 @@ public class TaskAttemptImpl implements TaskAttempt,
               TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
               TaskAttemptEventType.TA_STATUS_UPDATE,
               TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
-              TaskAttemptEventType.TA_COMMIT_PENDING,
               TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
               TaskAttemptEventType.TA_TIMED_OUT,
               TaskAttemptEventType.TA_KILL_REQUEST,
@@ -412,7 +412,6 @@ public class TaskAttemptImpl implements TaskAttempt,
               TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
               TaskAttemptEventType.TA_STATUS_UPDATE,
               TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
-              TaskAttemptEventType.TA_COMMIT_PENDING,
               TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
               TaskAttemptEventType.TA_TIMED_OUT,
               TaskAttemptEventType.TA_KILL_REQUEST,
@@ -569,6 +568,11 @@ public class TaskAttemptImpl implements TaskAttempt,
       readLock.unlock();
     }
   }
+  
+  @Override
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return terminationCause;
+  }
 
   @Override
   public TezCounters getCounters() {
@@ -815,6 +819,8 @@ public class TaskAttemptImpl implements TaskAttempt,
           this.reportedStatus.counters = tEvent.getCounters();
           this.reportedStatus.progress = 1f;
           this.reportedStatus.state = tEvent.getState();
+          this.terminationCause = tEvent.getTaskAttemptError() != null ? 
tEvent.getTaskAttemptError()
+              : TaskAttemptTerminationCause.UNKNOWN_ERROR;
           this.diagnostics.add(tEvent.getDiagnostics());
           this.recoveredState = tEvent.getState();
           sendEvent(createDAGCounterUpdateEventTAFinished(this, 
tEvent.getState()));
@@ -1032,8 +1038,8 @@ public class TaskAttemptImpl implements TaskAttempt,
 
     TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
         attemptId, getTask().getVertex().getName(), getLaunchTime(),
-        getFinishTime(), TaskAttemptState.SUCCEEDED, "",
-        getCounters());
+        getFinishTime(), TaskAttemptState.SUCCEEDED, null,
+        "", getCounters());
     // FIXME how do we store information regd completion events
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1048,9 +1054,9 @@ public class TaskAttemptImpl implements TaskAttempt,
     TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
         attemptId, getTask().getVertex().getName(), getLaunchTime(),
         finishTime, state,
+        terminationCause,
         StringUtils.join(
-            getDiagnostics(), LINE_SEPARATOR),
-        getCounters());
+            getDiagnostics(), LINE_SEPARATOR), getCounters());
     // FIXME how do we store information regd completion events
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1080,7 +1086,8 @@ public class TaskAttemptImpl implements TaskAttempt,
         LOG.error(msg, e);
         String diag = msg + ", " + e.getMessage() + ", " + 
ExceptionUtils.getStackTrace(e.getCause());
         new TerminatedBeforeRunningTransition(FAILED_HELPER).transition(ta,
-            new TaskAttemptEventAttemptFailed(ta.getID(), 
TaskAttemptEventType.TA_FAILED, diag));
+            new TaskAttemptEventAttemptFailed(ta.getID(), 
TaskAttemptEventType.TA_FAILED, diag,
+                TaskAttemptTerminationCause.APPLICATION_ERROR));
         return TaskAttemptStateInternal.FAILED;
       }
       // Create startTaskRequest
@@ -1162,6 +1169,13 @@ public class TaskAttemptImpl implements TaskAttempt,
       if (event instanceof DiagnosableEvent) {
         ta.addDiagnosticInfo(((DiagnosableEvent) event).getDiagnosticInfo());
       }
+      
+      // this should catch at test time if any new events are missing the 
error cause
+      assert event instanceof TaskAttemptEventTerminationCauseEvent;
+      
+      if (event instanceof TaskAttemptEventTerminationCauseEvent) {
+        ta.trySetTerminationCause(((TaskAttemptEventTerminationCauseEvent) 
event).getTerminationCause());
+      }
 
       ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta,
           helper.getTaskAttemptState()));
@@ -1558,6 +1572,13 @@ public class TaskAttemptImpl implements TaskAttempt,
       sendEvent(new VertexEventRouteEvent(vertex.getVertexId(), tezIfEvents));
     }
   }
+  
+  private void trySetTerminationCause(TaskAttemptTerminationCause err) {
+    // keep only the first error cause
+    if (terminationCause == TaskAttemptTerminationCause.UNKNOWN_ERROR) {
+      terminationCause = err;
+    }
+  }
 
   private void initTaskAttemptStatus(TaskAttemptStatus result) {
     result.progress = 0.0f;

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index b6c7eb9..87ee55a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Maps;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -85,6 +86,7 @@ import 
org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
@@ -92,6 +94,7 @@ import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.impl.TezEvent;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.tez.state.OnStateChangedCallback;
 import org.apache.tez.state.StateMachineTez;
 
@@ -742,7 +745,7 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
       if (state != TaskState.RUNNING) {
         LOG.info("Task not running. Issuing kill to bad commit attempt " + 
taskAttemptID);
         eventHandler.handle(new TaskAttemptEventKillRequest(taskAttemptID
-            , "Task not running. Bad attempt."));
+            , "Task not running. Bad attempt.", 
TaskAttemptTerminationCause.TERMINATED_ORPHANED));
         return false;
       }
       if (commitAttempt == null) {
@@ -1098,7 +1101,7 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
               + task.outputConsumableAttempt + " already has output ready");
         }
         task.eventHandler.handle(new TaskAttemptEventKillRequest(attemptId,
-            "Alternate attemptId already serving output"));
+            "Alternate attemptId already serving output", 
TaskAttemptTerminationCause.UNKNOWN_ERROR));
       }
 
     }
@@ -1126,6 +1129,7 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
           task.taskId, TaskState.SUCCEEDED));
       LOG.info("Task succeeded with attempt " + task.successfulAttempt);
       task.logJobHistoryTaskFinishedEvent();
+      TaskAttempt successfulAttempt = task.attempts.get(successTaId);
 
       // issue kill to all other attempts
       for (TaskAttempt attempt : task.attempts.values()) {
@@ -1134,9 +1138,21 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
             //  TA_KILL message to an attempt that doesn't need one for
             //  other reasons.
             !attempt.isFinished()) {
-          LOG.info("Issuing kill to other attempt " + attempt.getID());
+          LOG.info("Issuing kill to other attempt " + attempt.getID() + " as 
attempt: " +
+            task.successfulAttempt + " has succeeded");
+          String diagnostics = null;
+          TaskAttemptTerminationCause errCause = null;
+          if (attempt.getLaunchTime() < successfulAttempt.getLaunchTime()) {
+            diagnostics = "Killed this attempt as other speculative attempt : 
" + successTaId
+                + " succeeded";
+            errCause = 
TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION;
+          } else {
+            diagnostics = "Killed this speculative attempt as original 
attempt: " + successTaId
+                + " succeeded";
+            errCause = 
TaskAttemptTerminationCause.TERMINATED_INEFFECTIVE_SPECULATION;
+          }
           task.eventHandler.handle(new TaskAttemptEventKillRequest(attempt
-              .getID(), "Alternate attempt succeeded"));
+              .getID(), diagnostics, errCause));
         }
       }
       // send notification to DAG scheduler
@@ -1509,14 +1525,13 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
     }
   }
 
-  private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) {
+  private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg, 
TaskAttemptTerminationCause errorCause) {
     if (commitAttempt != null && commitAttempt.equals(attempt)) {
       LOG.info("Removing commit attempt: " + commitAttempt);
       commitAttempt = null;
     }
     if (attempt != null && !attempt.isFinished()) {
-      eventHandler.handle(new TaskAttemptEventKillRequest(attempt.getID(),
-          logMsg));
+      eventHandler.handle(new TaskAttemptEventKillRequest(attempt.getID(), 
logMsg, errorCause));
     }
   }
 
@@ -1538,8 +1553,8 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
       task.addDiagnosticInfo(terminateEvent.getDiagnosticInfo());
       // issue kill to all non finished attempts
       for (TaskAttempt attempt : task.attempts.values()) {
-        task.killUnfinishedAttempt
-            (attempt, "Task KILL is received. Killing attempt!");
+        task.killUnfinishedAttempt(attempt, "Task KILL is received. Killing 
attempt. Diagnostics: "
+            + terminateEvent.getDiagnosticInfo(), 
terminateEvent.getTerminationCause());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index c27d869..e1f7a94 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -139,6 +139,7 @@ import 
org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -1802,12 +1803,17 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex,
    */
   void tryEnactKill(VertexTerminationCause trigger,
       TaskTerminationCause taskterminationCause) {
+    // In most cases the dag is shutting down due to some error
+    TaskAttemptTerminationCause errCause = 
TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN;
+    if (taskterminationCause == TaskTerminationCause.DAG_KILL) {
+      errCause = TaskAttemptTerminationCause.TERMINATED_BY_CLIENT;
+    }
     if(trySetTerminationCause(trigger)){
-      LOG.info("Killing tasks in vertex: " + logIdentifier + " due to trigger: 
"
-          + trigger);
+      String msg = "Killing tasks in vertex: " + logIdentifier + " due to 
trigger: " + trigger; 
+      LOG.info(msg);
       for (Task task : tasks.values()) {
-        eventHandler.handle(
-            new TaskEventTermination(task.getTaskId(), taskterminationCause));
+        eventHandler.handle( // attempt was terminated because the vertex is 
shutting down
+            new TaskEventTermination(task.getTaskId(), errCause, msg));
       }
     }
   }
@@ -3840,12 +3846,32 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex,
       case TASK_ATTEMPT_FAILED_EVENT:
         {
           checkEventSourceMetadata(vertex, sourceMeta);
+          TaskAttemptTerminationCause errCause = null;
+          switch (sourceMeta.getEventGenerator()) {
+          case INPUT:
+            errCause = TaskAttemptTerminationCause.INPUT_READ_ERROR;
+            break;
+          case PROCESSOR:
+            errCause = TaskAttemptTerminationCause.APPLICATION_ERROR;
+            break;
+          case OUTPUT:
+            errCause = TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR;
+            break;
+          case SYSTEM:
+            errCause = TaskAttemptTerminationCause.FRAMEWORK_ERROR;
+            break;
+          default:
+            throw new TezUncheckedException("Unknown 
EventProducerConsumerType: " +
+                sourceMeta.getEventGenerator());
+          }
           TaskAttemptFailedEvent taskFailedEvent =
               (TaskAttemptFailedEvent) tezEvent.getEvent();
           vertex.getEventHandler().handle(
               new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(),
                   TaskAttemptEventType.TA_FAILED,
-                  "Error: " + taskFailedEvent.getDiagnostics()));
+                  "Error: " + taskFailedEvent.getDiagnostics(), 
+                  errCause)
+              );
         }
         break;
       default:

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index fae5d48..c56c93b 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -67,6 +67,7 @@ import 
org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.runtime.task.TezChild;
 
 
@@ -273,14 +274,14 @@ public class LocalContainerLauncher extends 
AbstractService implements
         LOG.info("Container: " + containerId + " completed successfully");
         appContext.getEventHandler().handle(
             new AMContainerEventCompleted(containerId, 
result.getExitStatus().getExitCode(),
-                null));
+                null, TaskAttemptTerminationCause.CONTAINER_EXITED));
       } else {
         LOG.info("Container: " + containerId + " completed but with errors");
         appContext.getEventHandler().handle(
             new AMContainerEventCompleted(containerId, 
result.getExitStatus().getExitCode(),
                 result.getErrorMessage() == null ?
                     (result.getThrowable() == null ? null : 
result.getThrowable().getMessage()) :
-                    result.getErrorMessage()));
+                    result.getErrorMessage(), 
TaskAttemptTerminationCause.APPLICATION_ERROR));
       }
     }
 
@@ -295,13 +296,13 @@ public class LocalContainerLauncher extends 
AbstractService implements
         appContext.getEventHandler()
             .handle(new AMContainerEventCompleted(containerId,
                 
TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE.getExitCode(),
-                t.getMessage()));
+                t.getMessage(), 
TaskAttemptTerminationCause.APPLICATION_ERROR));
       } else {
         LOG.info("Ignoring CancellationException - triggered by 
LocalContainerLauncher");
         appContext.getEventHandler()
             .handle(new AMContainerEventCompleted(containerId,
                 
TezChild.ContainerExecutionResult.ExitStatus.SUCCESS.getExitCode(),
-                "CancellationException"));
+                "CancellationException", 
TaskAttemptTerminationCause.CONTAINER_EXITED));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 0b202ab..9b02578 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -69,6 +69,7 @@ import 
org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
 import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
 import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
 import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 
 import com.google.common.base.Preconditions;
 
@@ -426,19 +427,22 @@ public class TaskSchedulerEventHandler extends 
AbstractService
     // Inform the Containers about completion.
     AMContainer amContainer = 
appContext.getAllContainers().get(containerStatus.getContainerId());
     if (amContainer != null) {
-      String message = null;
+      String message = "Container completed. ";
+      TaskAttemptTerminationCause errCause = 
TaskAttemptTerminationCause.CONTAINER_EXITED;
       int exitStatus = containerStatus.getExitStatus();
       if (exitStatus == ContainerExitStatus.PREEMPTED) {
         message = "Container preempted externally. ";
+        errCause = TaskAttemptTerminationCause.EXTERNAL_PREEMPTION;
       } else if (exitStatus == ContainerExitStatus.DISKS_FAILED) {
         message = "Container disk failed. ";
-      } else {
+        errCause = TaskAttemptTerminationCause.NODE_DISK_ERROR;
+      } else if (exitStatus != ContainerExitStatus.SUCCESS){
         message = "Container failed. ";
       }
       if (containerStatus.getDiagnostics() != null) {
         message += containerStatus.getDiagnostics();
       }
-      sendEvent(new AMContainerEventCompleted(amContainer.getContainerId(), 
exitStatus, message));
+      sendEvent(new AMContainerEventCompleted(amContainer.getContainerId(), 
exitStatus, message, errCause));
     }
   }
 
@@ -554,8 +558,8 @@ public class TaskSchedulerEventHandler extends 
AbstractService
   public void preemptContainer(ContainerId containerId) {
     taskScheduler.deallocateContainer(containerId);
     // Inform the Containers about completion.
-    sendEvent(new AMContainerEventCompleted(containerId,
-        ContainerExitStatus.PREEMPTED, "Container preempted internally"));
+    sendEvent(new AMContainerEventCompleted(containerId, 
ContainerExitStatus.INVALID,
+        "Container preempted internally", 
TaskAttemptTerminationCause.INTERNAL_PREEMPTION));
   }
 
   public void setShouldUnregisterFlag() {

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
index e9649f3..a455f1e 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
@@ -20,17 +20,20 @@ package org.apache.tez.dag.app.rm.container;
 
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 
 public class AMContainerEventCompleted extends AMContainerEvent {
 
   private final int exitStatus;
   private final String diagnostics;
+  private final TaskAttemptTerminationCause errCause;
 
   public AMContainerEventCompleted(ContainerId containerId, 
-      int exitStatus, String diagnostics) {
+      int exitStatus, String diagnostics, TaskAttemptTerminationCause 
errCause) {
     super(containerId, AMContainerEventType.C_COMPLETED);
     this.exitStatus = exitStatus;
     this.diagnostics = diagnostics;
+    this.errCause = errCause;
   }
 
   public boolean isPreempted() {
@@ -41,6 +44,10 @@ public class AMContainerEventCompleted extends 
AMContainerEvent {
     return (exitStatus == ContainerExitStatus.DISKS_FAILED);
   }
   
+  public boolean isClusterAction() {
+    return isPreempted() || isDiskFailed();
+  }
+  
   public String getDiagnostics() {
     return diagnostics;
   }
@@ -48,5 +55,9 @@ public class AMContainerEventCompleted extends 
AMContainerEvent {
   public int getContainerExitStatus() {
     return exitStatus;
   }
+  
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return errCause;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index a0f9cb7..9d4f46b 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -59,6 +59,7 @@ import 
org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 //import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
@@ -533,7 +534,7 @@ public class AMContainerImpl implements AMContainer {
           .getTaskAttemptId());
       container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
           "AMScheduler Error: TaskAttempt allocated to unlaunched container: " 
+
-              container.getContainerId());
+              container.getContainerId(), 
TaskAttemptTerminationCause.FRAMEWORK_ERROR);
       container.deAllocate();
       LOG.warn("Unexpected TA Assignment: TAId: " + event.getTaskAttemptId() +
           "  for ContainerId: " + container.getContainerId() +
@@ -644,8 +645,10 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       if (container.pendingAttempt != null) {
         AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) 
cEvent;
+        // for a properly setup cluster this should almost always be an app 
error
+        // need to differentiate between launch failed due to 
framework/cluster or app
         container.sendTerminatingToTaskAttempt(container.pendingAttempt,
-            event.getMessage());
+            event.getMessage(), 
TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED);
       }
       container.unregisterFromTAListener();
       container.deAllocate();
@@ -659,12 +662,17 @@ public class AMContainerImpl implements AMContainer {
       AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
       if (container.pendingAttempt != null) {
         String errorMessage = getMessage(container, event);
-        if (event.isPreempted() || event.isDiskFailed()) {
+        if (event.isClusterAction()) {
           
container.sendContainerTerminatedBySystemToTaskAttempt(container.pendingAttempt,
-              errorMessage);
+              errorMessage, event.getTerminationCause());
         } else {
-          container.sendTerminatedToTaskAttempt(container.pendingAttempt,
-              errorMessage);
+          container
+              .sendTerminatedToTaskAttempt(
+                  container.pendingAttempt,
+                  errorMessage,
+                  // if termination cause is generic exited then replace with 
specific
+                  (event.getTerminationCause() == 
TaskAttemptTerminationCause.CONTAINER_EXITED ? 
+                      TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED : 
event.getTerminationCause()));
         }
         container.registerFailedAttempt(container.pendingAttempt);
         container.pendingAttempt = null;
@@ -696,7 +704,7 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       if (container.pendingAttempt != null) {
         container.sendTerminatingToTaskAttempt(container.pendingAttempt,
-            getMessage(container, cEvent));
+            getMessage(container, cEvent), 
TaskAttemptTerminationCause.CONTAINER_STOPPED);
       }
       container.unregisterFromTAListener();
       container.logStopped(container.pendingAttempt == null ? 
@@ -722,27 +730,31 @@ public class AMContainerImpl implements AMContainer {
         return;
       }
       container.nodeFailed = true;
-      String errorMessage = null;
+      String errorMessage = "Node " + container.getContainer().getNodeId() + " 
failed. ";
       if (cEvent instanceof DiagnosableEvent) {
-        errorMessage = ((DiagnosableEvent) cEvent).getDiagnosticInfo();
+        errorMessage += ((DiagnosableEvent) cEvent).getDiagnosticInfo();
       }
 
       for (TezTaskAttemptID taId : container.failedAssignments) {
-        container.sendNodeFailureToTA(taId, errorMessage);
+        container.sendNodeFailureToTA(taId, errorMessage, 
TaskAttemptTerminationCause.NODE_FAILED);
       }
       for (TezTaskAttemptID taId : container.completedAttempts) {
-        container.sendNodeFailureToTA(taId, errorMessage);
+        container.sendNodeFailureToTA(taId, errorMessage, 
TaskAttemptTerminationCause.NODE_FAILED);
       }
 
       if (container.pendingAttempt != null) {
         // Will be null in COMPLETED state.
-        container.sendNodeFailureToTA(container.pendingAttempt, errorMessage);
-        container.sendTerminatingToTaskAttempt(container.pendingAttempt, "Node 
failure");
+        container.sendNodeFailureToTA(container.pendingAttempt, errorMessage, 
+            TaskAttemptTerminationCause.NODE_FAILED);
+        container.sendTerminatingToTaskAttempt(container.pendingAttempt, 
errorMessage,
+            TaskAttemptTerminationCause.NODE_FAILED);
       }
       if (container.runningAttempt != null) {
         // Will be null in COMPLETED state.
-        container.sendNodeFailureToTA(container.runningAttempt, errorMessage);
-        container.sendTerminatingToTaskAttempt(container.runningAttempt, "Node 
failure");
+        container.sendNodeFailureToTA(container.runningAttempt, errorMessage, 
+            TaskAttemptTerminationCause.NODE_FAILED);
+        container.sendTerminatingToTaskAttempt(container.runningAttempt, 
errorMessage,
+            TaskAttemptTerminationCause.NODE_FAILED);
       }
       container.logStopped(ContainerExitStatus.ABORTED);
     }
@@ -767,7 +779,7 @@ public class AMContainerImpl implements AMContainer {
         container.sendTerminatingToTaskAttempt(container.pendingAttempt,
             "Container " + container.getContainerId() +
                 " hit an invalid transition - " + cEvent.getType() + " at " +
-                container.getState());
+                container.getState(), 
TaskAttemptTerminationCause.FRAMEWORK_ERROR);
       }
       container.logStopped(ContainerExitStatus.ABORTED);
       container.sendStopRequestToNM();
@@ -909,12 +921,12 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) 
{
       AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
-      if (event.isPreempted() || event.isDiskFailed()) {
+      if (event.isClusterAction()) {
         
container.sendContainerTerminatedBySystemToTaskAttempt(container.runningAttempt,
-            getMessage(container, event));
+            getMessage(container, event), event.getTerminationCause());
       } else {
         container.sendTerminatedToTaskAttempt(container.runningAttempt,
-            getMessage(container, event));
+            getMessage(container, event), event.getTerminationCause());
       }
       container.unregisterAttemptFromListener(container.runningAttempt);
       container.registerFailedAttempt(container.runningAttempt);
@@ -929,8 +941,8 @@ public class AMContainerImpl implements AMContainer {
 
       container.unregisterAttemptFromListener(container.runningAttempt);
       container.sendTerminatingToTaskAttempt(container.runningAttempt,
-          " Container" + container.getContainerId() +
-              " received a STOP_REQUEST");
+          " Container" + container.getContainerId() + " received a 
STOP_REQUEST",
+          TaskAttemptTerminationCause.CONTAINER_STOPPED);
       super.transition(container, cEvent);
     }
   }
@@ -964,7 +976,7 @@ public class AMContainerImpl implements AMContainer {
       container.sendTerminatingToTaskAttempt(container.runningAttempt,
           "Container " + container.getContainerId() +
               " hit an invalid transition - " + cEvent.getType() + " at " +
-              container.getState());
+              container.getState(), 
TaskAttemptTerminationCause.FRAMEWORK_ERROR);
     }
   }
 
@@ -978,7 +990,8 @@ public class AMContainerImpl implements AMContainer {
           " cannot be allocated to container: " + container.getContainerId() +
           " in " + container.getState() + " state";
       
container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
-      container.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), 
errorMessage);
+      container.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), 
errorMessage,
+          TaskAttemptTerminationCause.CONTAINER_EXITED);
       container.registerFailedAttempt(event.getTaskAttemptId());
     }
   }
@@ -1001,15 +1014,18 @@ public class AMContainerImpl implements AMContainer {
       AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
       String diag = event.getDiagnostics();
       for (TezTaskAttemptID taId : container.failedAssignments) {
-        container.sendTerminatedToTaskAttempt(taId, diag);
+        container.sendTerminatedToTaskAttempt(taId, diag, 
+            TaskAttemptTerminationCause.CONTAINER_EXITED);
       }
       if (container.pendingAttempt != null) {
-        container.sendTerminatedToTaskAttempt(container.pendingAttempt, diag);
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt, diag, 
+            TaskAttemptTerminationCause.CONTAINER_EXITED);
         container.registerFailedAttempt(container.pendingAttempt);
         container.pendingAttempt = null;
       }
       if (container.runningAttempt != null) {
-        container.sendTerminatedToTaskAttempt(container.runningAttempt, diag);
+        container.sendTerminatedToTaskAttempt(container.runningAttempt, diag, 
+            TaskAttemptTerminationCause.CONTAINER_EXITED);
         container.registerFailedAttempt(container.runningAttempt);
         container.runningAttempt = null;
       }
@@ -1078,12 +1094,11 @@ public class AMContainerImpl implements AMContainer {
           + " in COMPLETED state";
       
container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
       container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
-          errorMessage);
+          errorMessage, TaskAttemptTerminationCause.FRAMEWORK_ERROR);
       container.registerFailedAttempt(event.getTaskAttemptId());
     }
   }
 
-
   private void handleExtraTAAssign(
       AMContainerEventAssignTA event, TezTaskAttemptID currentTaId) {
     this.inError = true;
@@ -1092,8 +1107,10 @@ public class AMContainerImpl implements AMContainer {
         ". Attempts: " + currentTaId + ", " + event.getTaskAttemptId() +
         ". Current state: " + this.getState();
     this.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
-    this.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage);
-    this.sendTerminatingToTaskAttempt(currentTaId, errorMessage);
+    this.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage,
+        TaskAttemptTerminationCause.FRAMEWORK_ERROR);
+    this.sendTerminatingToTaskAttempt(currentTaId, errorMessage,
+        TaskAttemptTerminationCause.FRAMEWORK_ERROR);
     this.registerFailedAttempt(event.getTaskAttemptId());
     LOG.warn(errorMessage);
     this.logStopped(ContainerExitStatus.INVALID);
@@ -1122,28 +1139,29 @@ public class AMContainerImpl implements AMContainer {
   }
 
   protected void sendTerminatedToTaskAttempt(
-      TezTaskAttemptID taId, String message) {
-    sendEvent(new TaskAttemptEventContainerTerminated(taId, message));
+      TezTaskAttemptID taId, String message, TaskAttemptTerminationCause 
errCause) {
+    sendEvent(new TaskAttemptEventContainerTerminated(taId, message, 
errCause));
   }
   
   protected void sendContainerTerminatedBySystemToTaskAttempt(
-    TezTaskAttemptID taId, String message) {
-      sendEvent(new TaskAttemptEventContainerTerminatedBySystem(taId, 
message));
+    TezTaskAttemptID taId, String message, TaskAttemptTerminationCause 
errorCause) {
+      sendEvent(new TaskAttemptEventContainerTerminatedBySystem(taId, message, 
errorCause));
   }
 
   protected void sendTerminatingToTaskAttempt(TezTaskAttemptID taId,
-      String message) {
-    sendEvent(new TaskAttemptEventContainerTerminating(taId, message));
+      String message, TaskAttemptTerminationCause errorCause) {
+    sendEvent(new TaskAttemptEventContainerTerminating(taId, message, 
errorCause));
   }
 
   protected void maybeSendNodeFailureForFailedAssignment(TezTaskAttemptID 
taId) {
     if (this.nodeFailed) {
-      this.sendNodeFailureToTA(taId, "Node Failed");
+      this.sendNodeFailureToTA(taId, "Node Failed", 
TaskAttemptTerminationCause.NODE_FAILED);
     }
   }
 
-  protected void sendNodeFailureToTA(TezTaskAttemptID taId, String message) {
-    sendEvent(new TaskAttemptEventNodeFailed(taId, message));
+  protected void sendNodeFailureToTA(TezTaskAttemptID taId, String message, 
+      TaskAttemptTerminationCause errorCause) {
+    sendEvent(new TaskAttemptEventNodeFailed(taId, message, errorCause));
   }
 
   protected void sendStartRequestToNM(ContainerLaunchContext clc) {

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 0ae8061..2b21c89 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -29,6 +29,7 @@ import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import 
org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptFinishedProto;
 
@@ -43,21 +44,23 @@ public class TaskAttemptFinishedEvent implements 
HistoryEvent {
   private TaskAttemptState state;
   private String diagnostics;
   private TezCounters tezCounters;
+  private TaskAttemptTerminationCause error;
 
   public TaskAttemptFinishedEvent(TezTaskAttemptID taId,
       String vertexName,
       long startTime,
       long finishTime,
       TaskAttemptState state,
-      String diagnostics,
-      TezCounters counters) {
+      TaskAttemptTerminationCause error,
+      String diagnostics, TezCounters counters) {
     this.taskAttemptId = taId;
     this.vertexName = vertexName;
     this.startTime = startTime;
     this.finishTime = finishTime;
     this.state = state;
     this.diagnostics = diagnostics;
-    tezCounters = counters;
+    this.tezCounters = counters;
+    this.error = error;
   }
 
   public TaskAttemptFinishedEvent() {
@@ -87,6 +90,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent 
{
     if (diagnostics != null) {
       builder.setDiagnostics(diagnostics);
     }
+    if (error != null) {
+      builder.setErrorEnum(error.name());
+    }
     if (tezCounters != null) {
       
builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
     }
@@ -100,6 +106,9 @@ public class TaskAttemptFinishedEvent implements 
HistoryEvent {
     if (proto.hasDiagnostics()) {
       this.diagnostics = proto.getDiagnostics();
     }
+    if (proto.hasErrorEnum()) {
+      this.error = TaskAttemptTerminationCause.valueOf(proto.getErrorEnum());
+    }
     if (proto.hasCounters()) {
       this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
         proto.getCounters());
@@ -129,6 +138,7 @@ public class TaskAttemptFinishedEvent implements 
HistoryEvent {
         + ", finishTime=" + finishTime
         + ", timeTaken=" + (finishTime - startTime)
         + ", status=" + state.name()
+        + ", errorEnum=" + (error != null ? error.name() : "")
         + ", diagnostics=" + diagnostics
         + ", counters=" + (tezCounters == null ? "null" :
           tezCounters.toString()
@@ -146,6 +156,10 @@ public class TaskAttemptFinishedEvent implements 
HistoryEvent {
   public String getDiagnostics() {
     return diagnostics;
   }
+  
+  public TaskAttemptTerminationCause getTaskAttemptError() {
+    return error;
+  }
 
   public long getFinishTime() {
     return finishTime;

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 8560359..79a0c34 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -18,11 +18,9 @@
 
 package org.apache.tez.dag.history.logging.impl;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import java.util.Map.Entry;
 import java.util.TreeMap;
 
 import org.apache.tez.common.ATSConstants;
@@ -485,6 +483,9 @@ public class HistoryEventJsonConversion {
     otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
     otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - 
event.getStartTime()));
     otherInfo.put(ATSConstants.STATUS, event.getState().name());
+    if (event.getTaskAttemptError() != null) {
+      otherInfo.put(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, 
event.getTaskAttemptError().name());
+    }
     otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
     otherInfo.put(ATSConstants.COUNTERS,
         DAGUtils.convertCountersToJSON(event.getCounters()));

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto 
b/tez-dag/src/main/proto/HistoryEvents.proto
index 93f217f..a02268c 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -165,6 +165,7 @@ message TaskAttemptFinishedProto {
   optional int32 state = 3;
   optional string diagnostics = 4;
   optional TezCountersProto counters = 5;
+  optional string error_enum = 6;
 }
 
 message EventMetaDataProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
index 0958c48..d1bef18 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
@@ -41,6 +41,7 @@ import 
org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
 import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
@@ -200,6 +201,7 @@ public class TestPreemption {
       TezTaskAttemptID testTaId = 
TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), i);      
       TaskAttemptImpl taImpl = dagImpl.getTaskAttempt(testTaId);
       Assert.assertEquals(TaskAttemptStateInternal.KILLED, 
taImpl.getInternalState());
+      Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION, 
taImpl.getTerminationCause());
     }
     
     System.out.println("TestPreemption - Done running - " + info);

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index bba4edb..788b59b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -87,6 +87,7 @@ import 
org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -300,9 +301,11 @@ public class TestTaskAttempt {
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
     // At state STARTING.
-    taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null));
+    taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null,
+        TaskAttemptTerminationCause.TERMINATED_BY_CLIENT));
     // At some KILLING state.
-    taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null));
+    taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null,
+        TaskAttemptTerminationCause.TERMINATED_BY_CLIENT));
     // taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID,
     // null));
     assertFalse(eventHandler.internalError);
@@ -366,7 +369,7 @@ public class TestTaskAttempt {
     verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
 
     taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID,
-        "Terminating"));
+        "Terminating", TaskAttemptTerminationCause.APPLICATION_ERROR));
     assertFalse(
         "InternalError occurred trying to handle TA_CONTAINER_TERMINATING",
         eventHandler.internalError);
@@ -376,6 +379,7 @@ public class TestTaskAttempt {
 
     assertEquals(1, taImpl.getDiagnostics().size());
     assertEquals("Terminating", taImpl.getDiagnostics().get(0));
+    assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, 
taImpl.getTerminationCause());
 
     int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3;
     arg = ArgumentCaptor.forClass(Event.class);
@@ -392,13 +396,16 @@ public class TestTaskAttempt {
             expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
 
     taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
-        "Terminated"));
+        "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED));
     int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0;
     arg = ArgumentCaptor.forClass(Event.class);
     verify(eventHandler, 
times(expectedEventAfterTerminated)).handle(arg.capture());
 
     assertEquals(2, taImpl.getDiagnostics().size());
     assertEquals("Terminated", taImpl.getDiagnostics().get(1));
+    
+    // check that original error cause is retained
+    assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, 
taImpl.getTerminationCause());
   }
 
 
@@ -452,13 +459,14 @@ public class TestTaskAttempt {
         null));
     assertEquals("Task attempt is not in running state", taImpl.getState(),
         TaskAttemptState.RUNNING);
-    taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, 
"Terminated"));
+    taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, 
"Terminated",
+        TaskAttemptTerminationCause.CONTAINER_EXITED));
     assertFalse(
         "InternalError occurred trying to handle TA_CONTAINER_TERMINATED",
         eventHandler.internalError);
 
     assertEquals("Terminated", taImpl.getDiagnostics().get(0));
-
+    assertEquals(TaskAttemptTerminationCause.CONTAINER_EXITED, 
taImpl.getTerminationCause());
     // TODO Ensure TA_TERMINATING after this is ingored.
   }
 
@@ -541,14 +549,15 @@ public class TestTaskAttempt {
             expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
 
     taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
-        "Terminated"));
+        "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED));
     int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0;
     arg = ArgumentCaptor.forClass(Event.class);
     verify(eventHandler, 
times(expectedEventAfterTerminated)).handle(arg.capture());
 
     // Verify that the diagnostic message included in the Terminated event is 
not
-    // captured - TA already succeeded.
+    // captured - TA already succeeded. Error cause is the default value.
     assertEquals(0, taImpl.getDiagnostics().size());
+    assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, 
taImpl.getTerminationCause());
   }
 
   @Test(timeout = 5000)
@@ -636,8 +645,9 @@ public class TestTaskAttempt {
     verify(eventHandler, 
times(expectedEventAfterTerminated)).handle(arg.capture());
 
     // Verify that the diagnostic message included in the Terminated event is 
not
-    // captured - TA already succeeded.
+    // captured - TA already succeeded. Error cause should be the default value
     assertEquals(0, taImpl.getDiagnostics().size());
+    assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, 
taImpl.getTerminationCause());
   }
 
   @Test(timeout = 5000)
@@ -718,7 +728,8 @@ public class TestTaskAttempt {
             expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
 
     // Send out a Node Failure.
-    taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, 
"NodeDecomissioned"));
+    taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, 
"NodeDecomissioned",
+        TaskAttemptTerminationCause.NODE_FAILED));
     // Verify in KILLED state
     assertEquals("Task attempt is not in the  KILLED state", 
TaskAttemptState.KILLED,
         taImpl.getState());
@@ -734,6 +745,7 @@ public class TestTaskAttempt {
     // Verify still in KILLED state
     assertEquals("Task attempt is not in the  KILLED state", 
TaskAttemptState.KILLED,
         taImpl.getState());
+    assertEquals(TaskAttemptTerminationCause.NODE_FAILED, 
taImpl.getTerminationCause());
   }
   
   @Test(timeout = 5000)
@@ -814,7 +826,8 @@ public class TestTaskAttempt {
             expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
 
     // Send out a Node Failure.
-    taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, 
"NodeDecomissioned"));
+    taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, 
"NodeDecomissioned", 
+        TaskAttemptTerminationCause.NODE_FAILED));
 
     // Verify no additional events
     int expectedEventsNodeFailure = expectedEvenstAfterTerminating + 0;
@@ -824,6 +837,8 @@ public class TestTaskAttempt {
     // Verify still in SUCCEEDED state
     assertEquals("Task attempt is not in the  SUCCEEDED state", 
TaskAttemptState.SUCCEEDED,
         taImpl.getState());
+    // error cause remains as default value
+    assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, 
taImpl.getTerminationCause());
   }
 
   @Test//(timeout = 5000)
@@ -909,6 +924,8 @@ public class TestTaskAttempt {
     taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 
11));
     assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
         TaskAttemptState.SUCCEEDED);
+    // default value of error cause
+    assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, 
taImpl.getTerminationCause());
 
     // different destination attempt reports error. now threshold crossed
     TezTaskAttemptID mockDestId2 = mock(TezTaskAttemptID.class);
@@ -923,6 +940,7 @@ public class TestTaskAttempt {
     finishEvent = (TaskAttemptFinishedEvent)histEvent.getHistoryEvent();
     long newFinishTime = finishEvent.getFinishTime();
     Assert.assertEquals(finishTime, newFinishTime);
+    assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, 
taImpl.getTerminationCause());
 
     assertEquals(true, taImpl.inputFailedReported);
     int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 2;
@@ -1059,7 +1077,8 @@ public class TestTaskAttempt {
         mockHeartbeatHandler, appCtx, locationHint, false,
         resource, createFakeContainerContext(), true);
     Assert.assertEquals(TaskAttemptStateInternal.NEW, 
taImpl.getInternalState());
-    taImpl.handle(new TaskAttemptEventKillRequest(taImpl.getID(), "kill it"));
+    taImpl.handle(new TaskAttemptEventKillRequest(taImpl.getID(), "kill it",
+                TaskAttemptTerminationCause.TERMINATED_BY_CLIENT));
     Assert.assertEquals(TaskAttemptStateInternal.KILLED, 
taImpl.getInternalState());
 
     Assert.assertEquals(0, taImpl.taskAttemptStartedEventLogged);

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index 1391361..7c75a1d 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -56,6 +56,7 @@ import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
@@ -147,10 +148,15 @@ public class TestTaskAttemptRecovery {
   private void restoreFromTAFinishedEvent(TaskAttemptState state) {
     String diag = "test_diag";
     TezCounters counters = mock(TezCounters.class);
+    
+    TaskAttemptTerminationCause errorEnum = null;
+    if (state != TaskAttemptState.SUCCEEDED) {
+      errorEnum = TaskAttemptTerminationCause.APPLICATION_ERROR;
+    }
 
     TaskAttemptState recoveredState =
         ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            startTime, finishTime, state, diag, counters));
+            startTime, finishTime, state, errorEnum, diag, counters));
     assertEquals(startTime, ta.getLaunchTime());
     assertEquals(finishTime, ta.getFinishTime());
     assertEquals(counters, ta.reportedStatus.counters);
@@ -159,6 +165,11 @@ public class TestTaskAttemptRecovery {
     assertEquals(1, ta.getDiagnostics().size());
     assertEquals(diag, ta.getDiagnostics().get(0));
     assertEquals(state, recoveredState);
+    if (state != TaskAttemptState.SUCCEEDED) {
+      assertEquals(errorEnum, ta.getTerminationCause());
+    } else {
+      assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, 
ta.getTerminationCause());
+    }
   }
 
   private void verifyEvents(List<Event> events, Class<? extends Event> 
eventClass,
@@ -278,7 +289,7 @@ public class TestTaskAttemptRecovery {
     TaskAttemptState recoveredState =
         ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             startTime, finishTime, TaskAttemptState.KILLED,
-            "", new TezCounters()));
+            TaskAttemptTerminationCause.APPLICATION_ERROR, "", new 
TezCounters()));
     assertEquals(TaskAttemptState.KILLED, recoveredState);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index ab7c87b..0b93093 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -54,7 +54,6 @@ import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.TaskStateInternal;
-import org.apache.tez.dag.app.dag.TaskTerminationCause;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -64,6 +63,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.rm.container.AMContainer;
 import org.apache.tez.dag.app.rm.node.AMNodeEventType;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -173,13 +173,12 @@ public class TestTaskImpl {
   }
 
   private void killTask(TezTaskID taskId) {
-    mockTask.handle(new TaskEventTermination(taskId, 
TaskTerminationCause.DAG_KILL));
+    mockTask.handle(new TaskEventTermination(taskId, 
TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
     assertTaskKillWaitState();
   }
 
   private void failTask(TezTaskID taskId) {
-    mockTask.handle(new TaskEventTermination(taskId,
-        TaskTerminationCause.OWN_TASK_FAILURE));
+    mockTask.handle(new TaskEventTermination(taskId, 
TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
     assertTaskKillWaitState();
   }
 
@@ -626,9 +625,9 @@ public class TestTaskImpl {
   @Test
   public void testDiagnostics_KillNew(){
     TezTaskID taskId = getNewTaskID();
-    mockTask.handle(new TaskEventTermination(taskId, 
TaskTerminationCause.DAG_KILL));
+    mockTask.handle(new TaskEventTermination(taskId, 
TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null));
     assertEquals(1, mockTask.getDiagnostics().size());
-    
assertTrue(mockTask.getDiagnostics().get(0).contains(TaskTerminationCause.DAG_KILL.name()));
+    
assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_BY_CLIENT.name()));
     assertEquals(0, mockTask.taskStartedEventLogged);
     assertEquals(1, mockTask.taskFinishedEventLogged);
   }
@@ -637,9 +636,9 @@ public class TestTaskImpl {
   public void testDiagnostics_Kill(){
     TezTaskID taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
-    mockTask.handle(new TaskEventTermination(taskId, 
TaskTerminationCause.OTHER_TASK_FAILURE));
+    mockTask.handle(new TaskEventTermination(taskId, 
TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
     assertEquals(1, mockTask.getDiagnostics().size());
-    
assertTrue(mockTask.getDiagnostics().get(0).contains(TaskTerminationCause.OTHER_TASK_FAILURE.name()));
+    
assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN.name()));
   }
 
   // TODO Add test to validate the correct commit attempt.

Reply via email to