TEZ-1773. Add attempt failure cause enum to the attempt failed/killed history
record (bikaS)
(cherry picked from commit 81eef37d9e1e9222ef09eed319c45cdcd9034cd8)
Conflicts:
CHANGES.txt
tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/84a7ef05
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/84a7ef05
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/84a7ef05
Branch: refs/heads/branch-0.5
Commit: 84a7ef0516cdcc0cd5a4d480ea6f235de4ad9db9
Parents: 3e5991a
Author: Bikas Saha <[email protected]>
Authored: Mon Nov 24 11:15:44 2014 -0800
Committer: Bikas Saha <[email protected]>
Committed: Tue Sep 22 14:37:17 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/tez/common/ATSConstants.java | 1 +
.../records/TaskAttemptTerminationCause.java | 45 ++++
.../tez/dag/app/TaskHeartbeatHandler.java | 3 +-
.../org/apache/tez/dag/app/dag/TaskAttempt.java | 2 +
.../event/TaskAttemptEventAttemptFailed.java | 13 +-
.../TaskAttemptEventContainerTerminated.java | 13 +-
...AttemptEventContainerTerminatedBySystem.java | 13 +-
.../TaskAttemptEventContainerTerminating.java | 12 +-
.../dag/event/TaskAttemptEventKillRequest.java | 13 +-
.../dag/event/TaskAttemptEventNodeFailed.java | 12 +-
.../dag/event/TaskAttemptEventOutputFailed.java | 9 +-
.../TaskAttemptEventTerminationCauseEvent.java | 26 +++
.../dag/app/dag/event/TaskAttemptEventType.java | 3 +-
.../dag/app/dag/event/TaskEventTermination.java | 28 ++-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 39 +++-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 33 ++-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 36 ++-
.../app/launcher/LocalContainerLauncher.java | 9 +-
.../dag/app/rm/TaskSchedulerEventHandler.java | 14 +-
.../rm/container/AMContainerEventCompleted.java | 13 +-
.../dag/app/rm/container/AMContainerImpl.java | 96 ++++----
.../events/TaskAttemptFinishedEvent.java | 20 +-
.../impl/HistoryEventJsonConversion.java | 5 +-
tez-dag/src/main/proto/HistoryEvents.proto | 1 +
.../org/apache/tez/dag/app/TestPreemption.java | 2 +
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 43 +++-
.../app/dag/impl/TestTaskAttemptRecovery.java | 15 +-
.../tez/dag/app/dag/impl/TestTaskImpl.java | 15 +-
.../tez/dag/app/dag/impl/TestTaskRecovery.java | 59 ++---
.../tez/dag/app/dag/impl/TestVertexImpl.java | 133 +++++++++--
.../app/rm/TestTaskSchedulerEventHandler.java | 30 +++
.../dag/app/rm/container/TestAMContainer.java | 230 ++++++++++++++-----
.../TestHistoryEventsProtoConversion.java | 8 +-
.../impl/TestHistoryEventJsonConversion.java | 3 +-
.../ats/HistoryEventTimelineConversion.java | 3 +
.../ats/TestHistoryEventTimelineConversion.java | 8 +-
37 files changed, 777 insertions(+), 233 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0646dfc..cbc6751 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,8 @@ INCOMPATIBLE CHANGES
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
ALL CHANGES:
+ TEZ-1773. Add attempt failure cause enum to the attempt failed/killed
+ history record
TEZ-2203. Intern strings in tez counters
TEZ-2834. Make Tez preemption resilient to incorrect free resource reported
by YARN
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index aec6ca5..944ae87 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -63,6 +63,7 @@ public class ATSConstants {
public static final String FINISH_TIME = "endTime";
public static final String TIME_TAKEN = "timeTaken";
public static final String STATUS = "status";
+ public static final String TASK_ATTEMPT_ERROR_ENUM = "taskAttemptErrorEnum";
public static final String DIAGNOSTICS = "diagnostics";
public static final String SUCCESSFUL_ATTEMPT_ID = "successfulAttemptId";
public static final String COUNTERS = "counters";
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
----------------------------------------------------------------------
diff --git
a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
new file mode 100644
index 0000000..ef0bb33
--- /dev/null
+++
b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+public enum TaskAttemptTerminationCause {
+ UNKNOWN_ERROR, // The error cause is unknown. Usually means a gap in error
propagation
+
+ TERMINATED_BY_CLIENT, // Killed by client command
+ TERMINATED_AT_SHUTDOWN, // Killed due execution shutdown
+ INTERNAL_PREEMPTION, // Killed by Tez to makes space for higher pri work
+ EXTERNAL_PREEMPTION, // Killed by the cluster to make space for other work
+ TERMINATED_INEFFECTIVE_SPECULATION, // Killed speculative attempt because
original succeeded
+ TERMINATED_EFFECTIVE_SPECULATION, // Killed original attempt because
speculation succeeded
+ TERMINATED_ORPHANED, // Attempt is no longer needed by the task
+
+ APPLICATION_ERROR, // Failed due to application code error
+ FRAMEWORK_ERROR, // Failed due to code error in Tez code
+ INPUT_READ_ERROR, // Failed due to error in reading inputs
+ OUTPUT_WRITE_ERROR, // Failed due to error in writing outputs
+ OUTPUT_LOST, // Failed because attempts output were reported lost
+ TASK_HEARTBEAT_ERROR, // Failed because AM lost connection to the task
+
+ CONTAINER_LAUNCH_FAILED, // Failed to launch container
+ CONTAINER_EXITED, // Container exited. Indicates gap in specific error
propagation from the cluster
+ CONTAINER_STOPPED, // Container stopped or released by Tez
+ NODE_FAILED, // Node for the container failed
+ NODE_DISK_ERROR, // Disk failed on the node runnign the task
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
index 6b698aa..d115b14 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -60,6 +61,6 @@ public class TaskHeartbeatHandler extends
HeartbeatHandlerBase<TezTaskAttemptID>
protected void handleTimeOut(TezTaskAttemptID attemptId) {
eventHandler.handle(new TaskAttemptEventAttemptFailed(attemptId,
TaskAttemptEventType.TA_TIMED_OUT, "AttemptID:" + attemptId.toString()
- + " Timed out after " + timeOut / 1000 + " secs"));
+ + " Timed out after " + timeOut / 1000 + " secs",
TaskAttemptTerminationCause.TASK_HEARTBEAT_ERROR));
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index f30fc5c..246324d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -29,6 +29,7 @@ import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -69,6 +70,7 @@ public interface TaskAttempt {
TaskAttemptReport getReport();
List<String> getDiagnostics();
+ TaskAttemptTerminationCause getTerminationCause();
TezCounters getCounters();
float getProgress();
TaskAttemptState getState();
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
index 5c7b956..b9c1d09 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
@@ -18,16 +18,19 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
public class TaskAttemptEventAttemptFailed extends TaskAttemptEvent
- implements DiagnosableEvent {
+ implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
private final String diagnostics;
+ private final TaskAttemptTerminationCause errorCause;
public TaskAttemptEventAttemptFailed(TezTaskAttemptID id,
- TaskAttemptEventType type, String diagnostics) {
+ TaskAttemptEventType type, String diagnostics,
TaskAttemptTerminationCause errorCause) {
super(id, type);
this.diagnostics = diagnostics;
+ this.errorCause = errorCause;
}
@Override
@@ -35,5 +38,9 @@ public class TaskAttemptEventAttemptFailed extends
TaskAttemptEvent
return diagnostics;
}
-
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
index 87aa313..5dd0141 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
@@ -17,20 +17,29 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
public class TaskAttemptEventContainerTerminated extends TaskAttemptEvent
- implements DiagnosableEvent {
+ implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
private final String message;
+ private final TaskAttemptTerminationCause errorCause;
- public TaskAttemptEventContainerTerminated(TezTaskAttemptID id, String
message) {
+ public TaskAttemptEventContainerTerminated(TezTaskAttemptID id, String
message,
+ TaskAttemptTerminationCause errCause) {
super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED);
this.message = message;
+ this.errorCause = errCause;
}
@Override
public String getDiagnosticInfo() {
return message;
}
+
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
index a92aafd..a3c57e4 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
@@ -18,19 +18,28 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
public class TaskAttemptEventContainerTerminatedBySystem extends
TaskAttemptEvent
- implements DiagnosableEvent {
+ implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
private final String diagnostics;
- public TaskAttemptEventContainerTerminatedBySystem(TezTaskAttemptID id,
String diagnostics) {
+ private final TaskAttemptTerminationCause errorCause;
+ public TaskAttemptEventContainerTerminatedBySystem(TezTaskAttemptID id,
String diagnostics,
+ TaskAttemptTerminationCause errorCause) {
super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
this.diagnostics = diagnostics;
+ this.errorCause = errorCause;
}
@Override
public String getDiagnosticInfo() {
return diagnostics;
}
+
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
index 7da6e14..02d04a5 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
@@ -17,17 +17,20 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
public class TaskAttemptEventContainerTerminating extends TaskAttemptEvent
- implements DiagnosableEvent {
+ implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
private final String message;
+ private final TaskAttemptTerminationCause errorCause;
public TaskAttemptEventContainerTerminating(TezTaskAttemptID id,
- String diagMessage) {
+ String diagMessage, TaskAttemptTerminationCause errCause) {
super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATING);
this.message = diagMessage;
+ this.errorCause = errCause;
}
@Override
@@ -35,4 +38,9 @@ public class TaskAttemptEventContainerTerminating extends
TaskAttemptEvent
return this.message;
}
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
index 9bceb1d..985becd 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
@@ -17,19 +17,28 @@
*/
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
-public class TaskAttemptEventKillRequest extends TaskAttemptEvent {
+public class TaskAttemptEventKillRequest extends TaskAttemptEvent
+ implements TaskAttemptEventTerminationCauseEvent {
private final String message;
+ private final TaskAttemptTerminationCause errorCause;
- public TaskAttemptEventKillRequest(TezTaskAttemptID id, String message) {
+ public TaskAttemptEventKillRequest(TezTaskAttemptID id, String message,
TaskAttemptTerminationCause err) {
super(id, TaskAttemptEventType.TA_KILL_REQUEST);
this.message = message;
+ this.errorCause = err;
}
public String getMessage() {
return this.message;
}
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
index 6d97466..541ef00 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
@@ -17,17 +17,20 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
public class TaskAttemptEventNodeFailed extends TaskAttemptEvent
- implements DiagnosableEvent{
+ implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
private final String message;
+ private final TaskAttemptTerminationCause errorCause;
public TaskAttemptEventNodeFailed(TezTaskAttemptID id,
- String diagMessage) {
+ String diagMessage, TaskAttemptTerminationCause errorCause) {
super(id, TaskAttemptEventType.TA_NODE_FAILED);
this.message = diagMessage;
+ this.errorCause = errorCause;
}
@Override
@@ -35,4 +38,9 @@ public class TaskAttemptEventNodeFailed extends
TaskAttemptEvent
return this.message;
}
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
index 678e1e7..6bc110a 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
@@ -18,10 +18,12 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TezEvent;
-public class TaskAttemptEventOutputFailed extends TaskAttemptEvent {
+public class TaskAttemptEventOutputFailed extends TaskAttemptEvent
+ implements TaskAttemptEventTerminationCauseEvent {
private TezEvent inputFailedEvent;
private int consumerTaskNumber;
@@ -40,5 +42,10 @@ public class TaskAttemptEventOutputFailed extends
TaskAttemptEvent {
public int getConsumerTaskNumber() {
return consumerTaskNumber;
}
+
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return TaskAttemptTerminationCause.OUTPUT_LOST;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java
new file mode 100644
index 0000000..70c20e3
--- /dev/null
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java
@@ -0,0 +1,26 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
+
+public interface TaskAttemptEventTerminationCauseEvent {
+
+ public TaskAttemptTerminationCause getTerminationCause();
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index e7db8d1..cae0b7d 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -29,9 +29,8 @@ public enum TaskAttemptEventType {
//Producer: TaskAttemptListener
TA_STARTED_REMOTELY,
TA_STATUS_UPDATE,
- TA_DIAGNOSTICS_UPDATE,
TA_OUTPUT_CONSUMABLE, // TODO History event to indicate this ?
- TA_COMMIT_PENDING,
+ TA_DIAGNOSTICS_UPDATE, // REMOVE THIS - UNUSED
TA_DONE,
TA_FAILED,
TA_TIMED_OUT,
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
index 73d5744..d48a0bf 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
@@ -18,22 +18,23 @@
package org.apache.tez.dag.app.dag.event;
-import org.apache.tez.dag.app.dag.TaskTerminationCause;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskID;
-public class TaskEventTermination extends TaskEvent implements
DiagnosableEvent{
+public class TaskEventTermination extends TaskEvent implements
DiagnosableEvent,
+ TaskAttemptEventTerminationCauseEvent {
- private TaskTerminationCause terminationCause;
- private String diagnostics;
+ private final String diagnostics;
+ private final TaskAttemptTerminationCause errorCause;
- public TaskEventTermination(TezTaskID taskID, TaskTerminationCause
terminationCause) {
+ public TaskEventTermination(TezTaskID taskID, TaskAttemptTerminationCause
errorCause, String diagnostics) {
super(taskID, TaskEventType.T_TERMINATE);
- this.terminationCause = terminationCause;
- this.diagnostics = "Task is terminated due to:" + terminationCause.name();
- }
-
- public TaskTerminationCause getTerminationCause() {
- return terminationCause;
+ this.errorCause = errorCause;
+ if (diagnostics != null) {
+ this.diagnostics = diagnostics;
+ } else {
+ this.diagnostics = "Task is terminated due to: " + errorCause.name();
+ }
}
@Override
@@ -41,4 +42,9 @@ public class TaskEventTermination extends TaskEvent
implements DiagnosableEvent{
return diagnostics;
}
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 1df07f8..f1bfefe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -76,6 +76,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
@@ -90,6 +91,7 @@ import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -124,6 +126,7 @@ public class TaskAttemptImpl implements TaskAttempt,
protected EventHandler eventHandler;
private final TezTaskAttemptID attemptId;
private final Clock clock;
+ private TaskAttemptTerminationCause terminationCause =
TaskAttemptTerminationCause.UNKNOWN_ERROR;
private final List<String> diagnostics = new ArrayList<String>();
private final Lock readLock;
private final Lock writeLock;
@@ -348,7 +351,6 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
- TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
@@ -371,7 +373,6 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
- TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
@@ -391,7 +392,6 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
- TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
@@ -412,7 +412,6 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
- TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
@@ -569,6 +568,11 @@ public class TaskAttemptImpl implements TaskAttempt,
readLock.unlock();
}
}
+
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return terminationCause;
+ }
@Override
public TezCounters getCounters() {
@@ -815,6 +819,8 @@ public class TaskAttemptImpl implements TaskAttempt,
this.reportedStatus.counters = tEvent.getCounters();
this.reportedStatus.progress = 1f;
this.reportedStatus.state = tEvent.getState();
+ this.terminationCause = tEvent.getTaskAttemptError() != null ?
tEvent.getTaskAttemptError()
+ : TaskAttemptTerminationCause.UNKNOWN_ERROR;
this.diagnostics.add(tEvent.getDiagnostics());
this.recoveredState = tEvent.getState();
sendEvent(createDAGCounterUpdateEventTAFinished(this,
tEvent.getState()));
@@ -1032,8 +1038,8 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
attemptId, getTask().getVertex().getName(), getLaunchTime(),
- getFinishTime(), TaskAttemptState.SUCCEEDED, "",
- getCounters());
+ getFinishTime(), TaskAttemptState.SUCCEEDED, null,
+ "", getCounters());
// FIXME how do we store information regd completion events
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1048,9 +1054,9 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
attemptId, getTask().getVertex().getName(), getLaunchTime(),
finishTime, state,
+ terminationCause,
StringUtils.join(
- getDiagnostics(), LINE_SEPARATOR),
- getCounters());
+ getDiagnostics(), LINE_SEPARATOR), getCounters());
// FIXME how do we store information regd completion events
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1080,7 +1086,8 @@ public class TaskAttemptImpl implements TaskAttempt,
LOG.error(msg, e);
String diag = msg + ", " + e.getMessage() + ", " +
ExceptionUtils.getStackTrace(e.getCause());
new TerminatedBeforeRunningTransition(FAILED_HELPER).transition(ta,
- new TaskAttemptEventAttemptFailed(ta.getID(),
TaskAttemptEventType.TA_FAILED, diag));
+ new TaskAttemptEventAttemptFailed(ta.getID(),
TaskAttemptEventType.TA_FAILED, diag,
+ TaskAttemptTerminationCause.APPLICATION_ERROR));
return TaskAttemptStateInternal.FAILED;
}
// Create startTaskRequest
@@ -1162,6 +1169,13 @@ public class TaskAttemptImpl implements TaskAttempt,
if (event instanceof DiagnosableEvent) {
ta.addDiagnosticInfo(((DiagnosableEvent) event).getDiagnosticInfo());
}
+
+ // this should catch at test time if any new events are missing the
error cause
+ assert event instanceof TaskAttemptEventTerminationCauseEvent;
+
+ if (event instanceof TaskAttemptEventTerminationCauseEvent) {
+ ta.trySetTerminationCause(((TaskAttemptEventTerminationCauseEvent)
event).getTerminationCause());
+ }
ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta,
helper.getTaskAttemptState()));
@@ -1558,6 +1572,13 @@ public class TaskAttemptImpl implements TaskAttempt,
sendEvent(new VertexEventRouteEvent(vertex.getVertexId(), tezIfEvents));
}
}
+
+ private void trySetTerminationCause(TaskAttemptTerminationCause err) {
+ // keep only the first error cause
+ if (terminationCause == TaskAttemptTerminationCause.UNKNOWN_ERROR) {
+ terminationCause = err;
+ }
+ }
private void initTaskAttemptStatus(TaskAttemptStatus result) {
result.progress = 0.0f;
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index b6c7eb9..87ee55a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Maps;
+
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -85,6 +86,7 @@ import
org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
@@ -92,6 +94,7 @@ import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.impl.TezEvent;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.tez.state.OnStateChangedCallback;
import org.apache.tez.state.StateMachineTez;
@@ -742,7 +745,7 @@ public class TaskImpl implements Task,
EventHandler<TaskEvent> {
if (state != TaskState.RUNNING) {
LOG.info("Task not running. Issuing kill to bad commit attempt " +
taskAttemptID);
eventHandler.handle(new TaskAttemptEventKillRequest(taskAttemptID
- , "Task not running. Bad attempt."));
+ , "Task not running. Bad attempt.",
TaskAttemptTerminationCause.TERMINATED_ORPHANED));
return false;
}
if (commitAttempt == null) {
@@ -1098,7 +1101,7 @@ public class TaskImpl implements Task,
EventHandler<TaskEvent> {
+ task.outputConsumableAttempt + " already has output ready");
}
task.eventHandler.handle(new TaskAttemptEventKillRequest(attemptId,
- "Alternate attemptId already serving output"));
+ "Alternate attemptId already serving output",
TaskAttemptTerminationCause.UNKNOWN_ERROR));
}
}
@@ -1126,6 +1129,7 @@ public class TaskImpl implements Task,
EventHandler<TaskEvent> {
task.taskId, TaskState.SUCCEEDED));
LOG.info("Task succeeded with attempt " + task.successfulAttempt);
task.logJobHistoryTaskFinishedEvent();
+ TaskAttempt successfulAttempt = task.attempts.get(successTaId);
// issue kill to all other attempts
for (TaskAttempt attempt : task.attempts.values()) {
@@ -1134,9 +1138,21 @@ public class TaskImpl implements Task,
EventHandler<TaskEvent> {
// TA_KILL message to an attempt that doesn't need one for
// other reasons.
!attempt.isFinished()) {
- LOG.info("Issuing kill to other attempt " + attempt.getID());
+ LOG.info("Issuing kill to other attempt " + attempt.getID() + " as
attempt: " +
+ task.successfulAttempt + " has succeeded");
+ String diagnostics = null;
+ TaskAttemptTerminationCause errCause = null;
+ if (attempt.getLaunchTime() < successfulAttempt.getLaunchTime()) {
+ diagnostics = "Killed this attempt as other speculative attempt :
" + successTaId
+ + " succeeded";
+ errCause =
TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION;
+ } else {
+ diagnostics = "Killed this speculative attempt as original
attempt: " + successTaId
+ + " succeeded";
+ errCause =
TaskAttemptTerminationCause.TERMINATED_INEFFECTIVE_SPECULATION;
+ }
task.eventHandler.handle(new TaskAttemptEventKillRequest(attempt
- .getID(), "Alternate attempt succeeded"));
+ .getID(), diagnostics, errCause));
}
}
// send notification to DAG scheduler
@@ -1509,14 +1525,13 @@ public class TaskImpl implements Task,
EventHandler<TaskEvent> {
}
}
- private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) {
+ private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg,
TaskAttemptTerminationCause errorCause) {
if (commitAttempt != null && commitAttempt.equals(attempt)) {
LOG.info("Removing commit attempt: " + commitAttempt);
commitAttempt = null;
}
if (attempt != null && !attempt.isFinished()) {
- eventHandler.handle(new TaskAttemptEventKillRequest(attempt.getID(),
- logMsg));
+ eventHandler.handle(new TaskAttemptEventKillRequest(attempt.getID(),
logMsg, errorCause));
}
}
@@ -1538,8 +1553,8 @@ public class TaskImpl implements Task,
EventHandler<TaskEvent> {
task.addDiagnosticInfo(terminateEvent.getDiagnosticInfo());
// issue kill to all non finished attempts
for (TaskAttempt attempt : task.attempts.values()) {
- task.killUnfinishedAttempt
- (attempt, "Task KILL is received. Killing attempt!");
+ task.killUnfinishedAttempt(attempt, "Task KILL is received. Killing
attempt. Diagnostics: "
+ + terminateEvent.getDiagnosticInfo(),
terminateEvent.getTerminationCause());
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index c27d869..e1f7a94 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -139,6 +139,7 @@ import
org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -1802,12 +1803,17 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex,
*/
void tryEnactKill(VertexTerminationCause trigger,
TaskTerminationCause taskterminationCause) {
+ // In most cases the dag is shutting down due to some error
+ TaskAttemptTerminationCause errCause =
TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN;
+ if (taskterminationCause == TaskTerminationCause.DAG_KILL) {
+ errCause = TaskAttemptTerminationCause.TERMINATED_BY_CLIENT;
+ }
if(trySetTerminationCause(trigger)){
- LOG.info("Killing tasks in vertex: " + logIdentifier + " due to trigger:
"
- + trigger);
+ String msg = "Killing tasks in vertex: " + logIdentifier + " due to
trigger: " + trigger;
+ LOG.info(msg);
for (Task task : tasks.values()) {
- eventHandler.handle(
- new TaskEventTermination(task.getTaskId(), taskterminationCause));
+ eventHandler.handle( // attempt was terminated because the vertex is
shutting down
+ new TaskEventTermination(task.getTaskId(), errCause, msg));
}
}
}
@@ -3840,12 +3846,32 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex,
case TASK_ATTEMPT_FAILED_EVENT:
{
checkEventSourceMetadata(vertex, sourceMeta);
+ TaskAttemptTerminationCause errCause = null;
+ switch (sourceMeta.getEventGenerator()) {
+ case INPUT:
+ errCause = TaskAttemptTerminationCause.INPUT_READ_ERROR;
+ break;
+ case PROCESSOR:
+ errCause = TaskAttemptTerminationCause.APPLICATION_ERROR;
+ break;
+ case OUTPUT:
+ errCause = TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR;
+ break;
+ case SYSTEM:
+ errCause = TaskAttemptTerminationCause.FRAMEWORK_ERROR;
+ break;
+ default:
+ throw new TezUncheckedException("Unknown
EventProducerConsumerType: " +
+ sourceMeta.getEventGenerator());
+ }
TaskAttemptFailedEvent taskFailedEvent =
(TaskAttemptFailedEvent) tezEvent.getEvent();
vertex.getEventHandler().handle(
new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(),
TaskAttemptEventType.TA_FAILED,
- "Error: " + taskFailedEvent.getDiagnostics()));
+ "Error: " + taskFailedEvent.getDiagnostics(),
+ errCause)
+ );
}
break;
default:
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index fae5d48..c56c93b 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -67,6 +67,7 @@ import
org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.runtime.task.TezChild;
@@ -273,14 +274,14 @@ public class LocalContainerLauncher extends
AbstractService implements
LOG.info("Container: " + containerId + " completed successfully");
appContext.getEventHandler().handle(
new AMContainerEventCompleted(containerId,
result.getExitStatus().getExitCode(),
- null));
+ null, TaskAttemptTerminationCause.CONTAINER_EXITED));
} else {
LOG.info("Container: " + containerId + " completed but with errors");
appContext.getEventHandler().handle(
new AMContainerEventCompleted(containerId,
result.getExitStatus().getExitCode(),
result.getErrorMessage() == null ?
(result.getThrowable() == null ? null :
result.getThrowable().getMessage()) :
- result.getErrorMessage()));
+ result.getErrorMessage(),
TaskAttemptTerminationCause.APPLICATION_ERROR));
}
}
@@ -295,13 +296,13 @@ public class LocalContainerLauncher extends
AbstractService implements
appContext.getEventHandler()
.handle(new AMContainerEventCompleted(containerId,
TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE.getExitCode(),
- t.getMessage()));
+ t.getMessage(),
TaskAttemptTerminationCause.APPLICATION_ERROR));
} else {
LOG.info("Ignoring CancellationException - triggered by
LocalContainerLauncher");
appContext.getEventHandler()
.handle(new AMContainerEventCompleted(containerId,
TezChild.ContainerExecutionResult.ExitStatus.SUCCESS.getExitCode(),
- "CancellationException"));
+ "CancellationException",
TaskAttemptTerminationCause.CONTAINER_EXITED));
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 0b202ab..9b02578 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -69,6 +69,7 @@ import
org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import com.google.common.base.Preconditions;
@@ -426,19 +427,22 @@ public class TaskSchedulerEventHandler extends
AbstractService
// Inform the Containers about completion.
AMContainer amContainer =
appContext.getAllContainers().get(containerStatus.getContainerId());
if (amContainer != null) {
- String message = null;
+ String message = "Container completed. ";
+ TaskAttemptTerminationCause errCause =
TaskAttemptTerminationCause.CONTAINER_EXITED;
int exitStatus = containerStatus.getExitStatus();
if (exitStatus == ContainerExitStatus.PREEMPTED) {
message = "Container preempted externally. ";
+ errCause = TaskAttemptTerminationCause.EXTERNAL_PREEMPTION;
} else if (exitStatus == ContainerExitStatus.DISKS_FAILED) {
message = "Container disk failed. ";
- } else {
+ errCause = TaskAttemptTerminationCause.NODE_DISK_ERROR;
+ } else if (exitStatus != ContainerExitStatus.SUCCESS){
message = "Container failed. ";
}
if (containerStatus.getDiagnostics() != null) {
message += containerStatus.getDiagnostics();
}
- sendEvent(new AMContainerEventCompleted(amContainer.getContainerId(),
exitStatus, message));
+ sendEvent(new AMContainerEventCompleted(amContainer.getContainerId(),
exitStatus, message, errCause));
}
}
@@ -554,8 +558,8 @@ public class TaskSchedulerEventHandler extends
AbstractService
public void preemptContainer(ContainerId containerId) {
taskScheduler.deallocateContainer(containerId);
// Inform the Containers about completion.
- sendEvent(new AMContainerEventCompleted(containerId,
- ContainerExitStatus.PREEMPTED, "Container preempted internally"));
+ sendEvent(new AMContainerEventCompleted(containerId,
ContainerExitStatus.INVALID,
+ "Container preempted internally",
TaskAttemptTerminationCause.INTERNAL_PREEMPTION));
}
public void setShouldUnregisterFlag() {
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
index e9649f3..a455f1e 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
@@ -20,17 +20,20 @@ package org.apache.tez.dag.app.rm.container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
public class AMContainerEventCompleted extends AMContainerEvent {
private final int exitStatus;
private final String diagnostics;
+ private final TaskAttemptTerminationCause errCause;
public AMContainerEventCompleted(ContainerId containerId,
- int exitStatus, String diagnostics) {
+ int exitStatus, String diagnostics, TaskAttemptTerminationCause
errCause) {
super(containerId, AMContainerEventType.C_COMPLETED);
this.exitStatus = exitStatus;
this.diagnostics = diagnostics;
+ this.errCause = errCause;
}
public boolean isPreempted() {
@@ -41,6 +44,10 @@ public class AMContainerEventCompleted extends
AMContainerEvent {
return (exitStatus == ContainerExitStatus.DISKS_FAILED);
}
+ public boolean isClusterAction() {
+ return isPreempted() || isDiskFailed();
+ }
+
public String getDiagnostics() {
return diagnostics;
}
@@ -48,5 +55,9 @@ public class AMContainerEventCompleted extends
AMContainerEvent {
public int getContainerExitStatus() {
return exitStatus;
}
+
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errCause;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index a0f9cb7..9d4f46b 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -59,6 +59,7 @@ import
org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
//import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
@@ -533,7 +534,7 @@ public class AMContainerImpl implements AMContainer {
.getTaskAttemptId());
container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
"AMScheduler Error: TaskAttempt allocated to unlaunched container: "
+
- container.getContainerId());
+ container.getContainerId(),
TaskAttemptTerminationCause.FRAMEWORK_ERROR);
container.deAllocate();
LOG.warn("Unexpected TA Assignment: TAId: " + event.getTaskAttemptId() +
" for ContainerId: " + container.getContainerId() +
@@ -644,8 +645,10 @@ public class AMContainerImpl implements AMContainer {
public void transition(AMContainerImpl container, AMContainerEvent cEvent)
{
if (container.pendingAttempt != null) {
AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed)
cEvent;
+ // for a properly setup cluster this should almost always be an app
error
+ // need to differentiate between launch failed due to
framework/cluster or app
container.sendTerminatingToTaskAttempt(container.pendingAttempt,
- event.getMessage());
+ event.getMessage(),
TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED);
}
container.unregisterFromTAListener();
container.deAllocate();
@@ -659,12 +662,17 @@ public class AMContainerImpl implements AMContainer {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
if (container.pendingAttempt != null) {
String errorMessage = getMessage(container, event);
- if (event.isPreempted() || event.isDiskFailed()) {
+ if (event.isClusterAction()) {
container.sendContainerTerminatedBySystemToTaskAttempt(container.pendingAttempt,
- errorMessage);
+ errorMessage, event.getTerminationCause());
} else {
- container.sendTerminatedToTaskAttempt(container.pendingAttempt,
- errorMessage);
+ container
+ .sendTerminatedToTaskAttempt(
+ container.pendingAttempt,
+ errorMessage,
+ // if termination cause is generic exited then replace with
specific
+ (event.getTerminationCause() ==
TaskAttemptTerminationCause.CONTAINER_EXITED ?
+ TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED :
event.getTerminationCause()));
}
container.registerFailedAttempt(container.pendingAttempt);
container.pendingAttempt = null;
@@ -696,7 +704,7 @@ public class AMContainerImpl implements AMContainer {
public void transition(AMContainerImpl container, AMContainerEvent cEvent)
{
if (container.pendingAttempt != null) {
container.sendTerminatingToTaskAttempt(container.pendingAttempt,
- getMessage(container, cEvent));
+ getMessage(container, cEvent),
TaskAttemptTerminationCause.CONTAINER_STOPPED);
}
container.unregisterFromTAListener();
container.logStopped(container.pendingAttempt == null ?
@@ -722,27 +730,31 @@ public class AMContainerImpl implements AMContainer {
return;
}
container.nodeFailed = true;
- String errorMessage = null;
+ String errorMessage = "Node " + container.getContainer().getNodeId() + "
failed. ";
if (cEvent instanceof DiagnosableEvent) {
- errorMessage = ((DiagnosableEvent) cEvent).getDiagnosticInfo();
+ errorMessage += ((DiagnosableEvent) cEvent).getDiagnosticInfo();
}
for (TezTaskAttemptID taId : container.failedAssignments) {
- container.sendNodeFailureToTA(taId, errorMessage);
+ container.sendNodeFailureToTA(taId, errorMessage,
TaskAttemptTerminationCause.NODE_FAILED);
}
for (TezTaskAttemptID taId : container.completedAttempts) {
- container.sendNodeFailureToTA(taId, errorMessage);
+ container.sendNodeFailureToTA(taId, errorMessage,
TaskAttemptTerminationCause.NODE_FAILED);
}
if (container.pendingAttempt != null) {
// Will be null in COMPLETED state.
- container.sendNodeFailureToTA(container.pendingAttempt, errorMessage);
- container.sendTerminatingToTaskAttempt(container.pendingAttempt, "Node
failure");
+ container.sendNodeFailureToTA(container.pendingAttempt, errorMessage,
+ TaskAttemptTerminationCause.NODE_FAILED);
+ container.sendTerminatingToTaskAttempt(container.pendingAttempt,
errorMessage,
+ TaskAttemptTerminationCause.NODE_FAILED);
}
if (container.runningAttempt != null) {
// Will be null in COMPLETED state.
- container.sendNodeFailureToTA(container.runningAttempt, errorMessage);
- container.sendTerminatingToTaskAttempt(container.runningAttempt, "Node
failure");
+ container.sendNodeFailureToTA(container.runningAttempt, errorMessage,
+ TaskAttemptTerminationCause.NODE_FAILED);
+ container.sendTerminatingToTaskAttempt(container.runningAttempt,
errorMessage,
+ TaskAttemptTerminationCause.NODE_FAILED);
}
container.logStopped(ContainerExitStatus.ABORTED);
}
@@ -767,7 +779,7 @@ public class AMContainerImpl implements AMContainer {
container.sendTerminatingToTaskAttempt(container.pendingAttempt,
"Container " + container.getContainerId() +
" hit an invalid transition - " + cEvent.getType() + " at " +
- container.getState());
+ container.getState(),
TaskAttemptTerminationCause.FRAMEWORK_ERROR);
}
container.logStopped(ContainerExitStatus.ABORTED);
container.sendStopRequestToNM();
@@ -909,12 +921,12 @@ public class AMContainerImpl implements AMContainer {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent)
{
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
- if (event.isPreempted() || event.isDiskFailed()) {
+ if (event.isClusterAction()) {
container.sendContainerTerminatedBySystemToTaskAttempt(container.runningAttempt,
- getMessage(container, event));
+ getMessage(container, event), event.getTerminationCause());
} else {
container.sendTerminatedToTaskAttempt(container.runningAttempt,
- getMessage(container, event));
+ getMessage(container, event), event.getTerminationCause());
}
container.unregisterAttemptFromListener(container.runningAttempt);
container.registerFailedAttempt(container.runningAttempt);
@@ -929,8 +941,8 @@ public class AMContainerImpl implements AMContainer {
container.unregisterAttemptFromListener(container.runningAttempt);
container.sendTerminatingToTaskAttempt(container.runningAttempt,
- " Container" + container.getContainerId() +
- " received a STOP_REQUEST");
+ " Container" + container.getContainerId() + " received a
STOP_REQUEST",
+ TaskAttemptTerminationCause.CONTAINER_STOPPED);
super.transition(container, cEvent);
}
}
@@ -964,7 +976,7 @@ public class AMContainerImpl implements AMContainer {
container.sendTerminatingToTaskAttempt(container.runningAttempt,
"Container " + container.getContainerId() +
" hit an invalid transition - " + cEvent.getType() + " at " +
- container.getState());
+ container.getState(),
TaskAttemptTerminationCause.FRAMEWORK_ERROR);
}
}
@@ -978,7 +990,8 @@ public class AMContainerImpl implements AMContainer {
" cannot be allocated to container: " + container.getContainerId() +
" in " + container.getState() + " state";
container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
- container.sendTerminatingToTaskAttempt(event.getTaskAttemptId(),
errorMessage);
+ container.sendTerminatingToTaskAttempt(event.getTaskAttemptId(),
errorMessage,
+ TaskAttemptTerminationCause.CONTAINER_EXITED);
container.registerFailedAttempt(event.getTaskAttemptId());
}
}
@@ -1001,15 +1014,18 @@ public class AMContainerImpl implements AMContainer {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
String diag = event.getDiagnostics();
for (TezTaskAttemptID taId : container.failedAssignments) {
- container.sendTerminatedToTaskAttempt(taId, diag);
+ container.sendTerminatedToTaskAttempt(taId, diag,
+ TaskAttemptTerminationCause.CONTAINER_EXITED);
}
if (container.pendingAttempt != null) {
- container.sendTerminatedToTaskAttempt(container.pendingAttempt, diag);
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt, diag,
+ TaskAttemptTerminationCause.CONTAINER_EXITED);
container.registerFailedAttempt(container.pendingAttempt);
container.pendingAttempt = null;
}
if (container.runningAttempt != null) {
- container.sendTerminatedToTaskAttempt(container.runningAttempt, diag);
+ container.sendTerminatedToTaskAttempt(container.runningAttempt, diag,
+ TaskAttemptTerminationCause.CONTAINER_EXITED);
container.registerFailedAttempt(container.runningAttempt);
container.runningAttempt = null;
}
@@ -1078,12 +1094,11 @@ public class AMContainerImpl implements AMContainer {
+ " in COMPLETED state";
container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
- errorMessage);
+ errorMessage, TaskAttemptTerminationCause.FRAMEWORK_ERROR);
container.registerFailedAttempt(event.getTaskAttemptId());
}
}
-
private void handleExtraTAAssign(
AMContainerEventAssignTA event, TezTaskAttemptID currentTaId) {
this.inError = true;
@@ -1092,8 +1107,10 @@ public class AMContainerImpl implements AMContainer {
". Attempts: " + currentTaId + ", " + event.getTaskAttemptId() +
". Current state: " + this.getState();
this.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
- this.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage);
- this.sendTerminatingToTaskAttempt(currentTaId, errorMessage);
+ this.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage,
+ TaskAttemptTerminationCause.FRAMEWORK_ERROR);
+ this.sendTerminatingToTaskAttempt(currentTaId, errorMessage,
+ TaskAttemptTerminationCause.FRAMEWORK_ERROR);
this.registerFailedAttempt(event.getTaskAttemptId());
LOG.warn(errorMessage);
this.logStopped(ContainerExitStatus.INVALID);
@@ -1122,28 +1139,29 @@ public class AMContainerImpl implements AMContainer {
}
protected void sendTerminatedToTaskAttempt(
- TezTaskAttemptID taId, String message) {
- sendEvent(new TaskAttemptEventContainerTerminated(taId, message));
+ TezTaskAttemptID taId, String message, TaskAttemptTerminationCause
errCause) {
+ sendEvent(new TaskAttemptEventContainerTerminated(taId, message,
errCause));
}
protected void sendContainerTerminatedBySystemToTaskAttempt(
- TezTaskAttemptID taId, String message) {
- sendEvent(new TaskAttemptEventContainerTerminatedBySystem(taId,
message));
+ TezTaskAttemptID taId, String message, TaskAttemptTerminationCause
errorCause) {
+ sendEvent(new TaskAttemptEventContainerTerminatedBySystem(taId, message,
errorCause));
}
protected void sendTerminatingToTaskAttempt(TezTaskAttemptID taId,
- String message) {
- sendEvent(new TaskAttemptEventContainerTerminating(taId, message));
+ String message, TaskAttemptTerminationCause errorCause) {
+ sendEvent(new TaskAttemptEventContainerTerminating(taId, message,
errorCause));
}
protected void maybeSendNodeFailureForFailedAssignment(TezTaskAttemptID
taId) {
if (this.nodeFailed) {
- this.sendNodeFailureToTA(taId, "Node Failed");
+ this.sendNodeFailureToTA(taId, "Node Failed",
TaskAttemptTerminationCause.NODE_FAILED);
}
}
- protected void sendNodeFailureToTA(TezTaskAttemptID taId, String message) {
- sendEvent(new TaskAttemptEventNodeFailed(taId, message));
+ protected void sendNodeFailureToTA(TezTaskAttemptID taId, String message,
+ TaskAttemptTerminationCause errorCause) {
+ sendEvent(new TaskAttemptEventNodeFailed(taId, message, errorCause));
}
protected void sendStartRequestToNM(ContainerLaunchContext clc) {
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 0ae8061..2b21c89 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -29,6 +29,7 @@ import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import
org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptFinishedProto;
@@ -43,21 +44,23 @@ public class TaskAttemptFinishedEvent implements
HistoryEvent {
private TaskAttemptState state;
private String diagnostics;
private TezCounters tezCounters;
+ private TaskAttemptTerminationCause error;
public TaskAttemptFinishedEvent(TezTaskAttemptID taId,
String vertexName,
long startTime,
long finishTime,
TaskAttemptState state,
- String diagnostics,
- TezCounters counters) {
+ TaskAttemptTerminationCause error,
+ String diagnostics, TezCounters counters) {
this.taskAttemptId = taId;
this.vertexName = vertexName;
this.startTime = startTime;
this.finishTime = finishTime;
this.state = state;
this.diagnostics = diagnostics;
- tezCounters = counters;
+ this.tezCounters = counters;
+ this.error = error;
}
public TaskAttemptFinishedEvent() {
@@ -87,6 +90,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent
{
if (diagnostics != null) {
builder.setDiagnostics(diagnostics);
}
+ if (error != null) {
+ builder.setErrorEnum(error.name());
+ }
if (tezCounters != null) {
builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
}
@@ -100,6 +106,9 @@ public class TaskAttemptFinishedEvent implements
HistoryEvent {
if (proto.hasDiagnostics()) {
this.diagnostics = proto.getDiagnostics();
}
+ if (proto.hasErrorEnum()) {
+ this.error = TaskAttemptTerminationCause.valueOf(proto.getErrorEnum());
+ }
if (proto.hasCounters()) {
this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
proto.getCounters());
@@ -129,6 +138,7 @@ public class TaskAttemptFinishedEvent implements
HistoryEvent {
+ ", finishTime=" + finishTime
+ ", timeTaken=" + (finishTime - startTime)
+ ", status=" + state.name()
+ + ", errorEnum=" + (error != null ? error.name() : "")
+ ", diagnostics=" + diagnostics
+ ", counters=" + (tezCounters == null ? "null" :
tezCounters.toString()
@@ -146,6 +156,10 @@ public class TaskAttemptFinishedEvent implements
HistoryEvent {
public String getDiagnostics() {
return diagnostics;
}
+
+ public TaskAttemptTerminationCause getTaskAttemptError() {
+ return error;
+ }
public long getFinishTime() {
return finishTime;
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 8560359..79a0c34 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -18,11 +18,9 @@
package org.apache.tez.dag.history.logging.impl;
-import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.tez.common.ATSConstants;
@@ -485,6 +483,9 @@ public class HistoryEventJsonConversion {
otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() -
event.getStartTime()));
otherInfo.put(ATSConstants.STATUS, event.getState().name());
+ if (event.getTaskAttemptError() != null) {
+ otherInfo.put(ATSConstants.TASK_ATTEMPT_ERROR_ENUM,
event.getTaskAttemptError().name());
+ }
otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
otherInfo.put(ATSConstants.COUNTERS,
DAGUtils.convertCountersToJSON(event.getCounters()));
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto
b/tez-dag/src/main/proto/HistoryEvents.proto
index 93f217f..a02268c 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -165,6 +165,7 @@ message TaskAttemptFinishedProto {
optional int32 state = 3;
optional string diagnostics = 4;
optional TezCountersProto counters = 5;
+ optional string error_enum = 6;
}
message EventMetaDataProto {
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
index 0958c48..d1bef18 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
@@ -41,6 +41,7 @@ import
org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
@@ -200,6 +201,7 @@ public class TestPreemption {
TezTaskAttemptID testTaId =
TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), i);
TaskAttemptImpl taImpl = dagImpl.getTaskAttempt(testTaId);
Assert.assertEquals(TaskAttemptStateInternal.KILLED,
taImpl.getInternalState());
+ Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION,
taImpl.getTerminationCause());
}
System.out.println("TestPreemption - Done running - " + info);
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index bba4edb..788b59b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -87,6 +87,7 @@ import
org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -300,9 +301,11 @@ public class TestTaskAttempt {
taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
// At state STARTING.
- taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null));
+ taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null,
+ TaskAttemptTerminationCause.TERMINATED_BY_CLIENT));
// At some KILLING state.
- taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null));
+ taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null,
+ TaskAttemptTerminationCause.TERMINATED_BY_CLIENT));
// taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID,
// null));
assertFalse(eventHandler.internalError);
@@ -366,7 +369,7 @@ public class TestTaskAttempt {
verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID,
- "Terminating"));
+ "Terminating", TaskAttemptTerminationCause.APPLICATION_ERROR));
assertFalse(
"InternalError occurred trying to handle TA_CONTAINER_TERMINATING",
eventHandler.internalError);
@@ -376,6 +379,7 @@ public class TestTaskAttempt {
assertEquals(1, taImpl.getDiagnostics().size());
assertEquals("Terminating", taImpl.getDiagnostics().get(0));
+ assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR,
taImpl.getTerminationCause());
int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3;
arg = ArgumentCaptor.forClass(Event.class);
@@ -392,13 +396,16 @@ public class TestTaskAttempt {
expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
- "Terminated"));
+ "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED));
int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0;
arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler,
times(expectedEventAfterTerminated)).handle(arg.capture());
assertEquals(2, taImpl.getDiagnostics().size());
assertEquals("Terminated", taImpl.getDiagnostics().get(1));
+
+ // check that original error cause is retained
+ assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR,
taImpl.getTerminationCause());
}
@@ -452,13 +459,14 @@ public class TestTaskAttempt {
null));
assertEquals("Task attempt is not in running state", taImpl.getState(),
TaskAttemptState.RUNNING);
- taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
"Terminated"));
+ taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
"Terminated",
+ TaskAttemptTerminationCause.CONTAINER_EXITED));
assertFalse(
"InternalError occurred trying to handle TA_CONTAINER_TERMINATED",
eventHandler.internalError);
assertEquals("Terminated", taImpl.getDiagnostics().get(0));
-
+ assertEquals(TaskAttemptTerminationCause.CONTAINER_EXITED,
taImpl.getTerminationCause());
// TODO Ensure TA_TERMINATING after this is ingored.
}
@@ -541,14 +549,15 @@ public class TestTaskAttempt {
expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
- "Terminated"));
+ "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED));
int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0;
arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler,
times(expectedEventAfterTerminated)).handle(arg.capture());
// Verify that the diagnostic message included in the Terminated event is
not
- // captured - TA already succeeded.
+ // captured - TA already succeeded. Error cause is the default value.
assertEquals(0, taImpl.getDiagnostics().size());
+ assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR,
taImpl.getTerminationCause());
}
@Test(timeout = 5000)
@@ -636,8 +645,9 @@ public class TestTaskAttempt {
verify(eventHandler,
times(expectedEventAfterTerminated)).handle(arg.capture());
// Verify that the diagnostic message included in the Terminated event is
not
- // captured - TA already succeeded.
+ // captured - TA already succeeded. Error cause should be the default value
assertEquals(0, taImpl.getDiagnostics().size());
+ assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR,
taImpl.getTerminationCause());
}
@Test(timeout = 5000)
@@ -718,7 +728,8 @@ public class TestTaskAttempt {
expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
// Send out a Node Failure.
- taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID,
"NodeDecomissioned"));
+ taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID,
"NodeDecomissioned",
+ TaskAttemptTerminationCause.NODE_FAILED));
// Verify in KILLED state
assertEquals("Task attempt is not in the KILLED state",
TaskAttemptState.KILLED,
taImpl.getState());
@@ -734,6 +745,7 @@ public class TestTaskAttempt {
// Verify still in KILLED state
assertEquals("Task attempt is not in the KILLED state",
TaskAttemptState.KILLED,
taImpl.getState());
+ assertEquals(TaskAttemptTerminationCause.NODE_FAILED,
taImpl.getTerminationCause());
}
@Test(timeout = 5000)
@@ -814,7 +826,8 @@ public class TestTaskAttempt {
expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
// Send out a Node Failure.
- taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID,
"NodeDecomissioned"));
+ taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID,
"NodeDecomissioned",
+ TaskAttemptTerminationCause.NODE_FAILED));
// Verify no additional events
int expectedEventsNodeFailure = expectedEvenstAfterTerminating + 0;
@@ -824,6 +837,8 @@ public class TestTaskAttempt {
// Verify still in SUCCEEDED state
assertEquals("Task attempt is not in the SUCCEEDED state",
TaskAttemptState.SUCCEEDED,
taImpl.getState());
+ // error cause remains as default value
+ assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR,
taImpl.getTerminationCause());
}
@Test//(timeout = 5000)
@@ -909,6 +924,8 @@ public class TestTaskAttempt {
taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent,
11));
assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
TaskAttemptState.SUCCEEDED);
+ // default value of error cause
+ assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR,
taImpl.getTerminationCause());
// different destination attempt reports error. now threshold crossed
TezTaskAttemptID mockDestId2 = mock(TezTaskAttemptID.class);
@@ -923,6 +940,7 @@ public class TestTaskAttempt {
finishEvent = (TaskAttemptFinishedEvent)histEvent.getHistoryEvent();
long newFinishTime = finishEvent.getFinishTime();
Assert.assertEquals(finishTime, newFinishTime);
+ assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST,
taImpl.getTerminationCause());
assertEquals(true, taImpl.inputFailedReported);
int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 2;
@@ -1059,7 +1077,8 @@ public class TestTaskAttempt {
mockHeartbeatHandler, appCtx, locationHint, false,
resource, createFakeContainerContext(), true);
Assert.assertEquals(TaskAttemptStateInternal.NEW,
taImpl.getInternalState());
- taImpl.handle(new TaskAttemptEventKillRequest(taImpl.getID(), "kill it"));
+ taImpl.handle(new TaskAttemptEventKillRequest(taImpl.getID(), "kill it",
+ TaskAttemptTerminationCause.TERMINATED_BY_CLIENT));
Assert.assertEquals(TaskAttemptStateInternal.KILLED,
taImpl.getInternalState());
Assert.assertEquals(0, taImpl.taskAttemptStartedEventLogged);
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index 1391361..7c75a1d 100644
---
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -56,6 +56,7 @@ import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
@@ -147,10 +148,15 @@ public class TestTaskAttemptRecovery {
private void restoreFromTAFinishedEvent(TaskAttemptState state) {
String diag = "test_diag";
TezCounters counters = mock(TezCounters.class);
+
+ TaskAttemptTerminationCause errorEnum = null;
+ if (state != TaskAttemptState.SUCCEEDED) {
+ errorEnum = TaskAttemptTerminationCause.APPLICATION_ERROR;
+ }
TaskAttemptState recoveredState =
ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- startTime, finishTime, state, diag, counters));
+ startTime, finishTime, state, errorEnum, diag, counters));
assertEquals(startTime, ta.getLaunchTime());
assertEquals(finishTime, ta.getFinishTime());
assertEquals(counters, ta.reportedStatus.counters);
@@ -159,6 +165,11 @@ public class TestTaskAttemptRecovery {
assertEquals(1, ta.getDiagnostics().size());
assertEquals(diag, ta.getDiagnostics().get(0));
assertEquals(state, recoveredState);
+ if (state != TaskAttemptState.SUCCEEDED) {
+ assertEquals(errorEnum, ta.getTerminationCause());
+ } else {
+ assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR,
ta.getTerminationCause());
+ }
}
private void verifyEvents(List<Event> events, Class<? extends Event>
eventClass,
@@ -278,7 +289,7 @@ public class TestTaskAttemptRecovery {
TaskAttemptState recoveredState =
ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
startTime, finishTime, TaskAttemptState.KILLED,
- "", new TezCounters()));
+ TaskAttemptTerminationCause.APPLICATION_ERROR, "", new
TezCounters()));
assertEquals(TaskAttemptState.KILLED, recoveredState);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index ab7c87b..0b93093 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -54,7 +54,6 @@ import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.TaskStateInternal;
-import org.apache.tez.dag.app.dag.TaskTerminationCause;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -64,6 +63,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.node.AMNodeEventType;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -173,13 +173,12 @@ public class TestTaskImpl {
}
private void killTask(TezTaskID taskId) {
- mockTask.handle(new TaskEventTermination(taskId,
TaskTerminationCause.DAG_KILL));
+ mockTask.handle(new TaskEventTermination(taskId,
TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
assertTaskKillWaitState();
}
private void failTask(TezTaskID taskId) {
- mockTask.handle(new TaskEventTermination(taskId,
- TaskTerminationCause.OWN_TASK_FAILURE));
+ mockTask.handle(new TaskEventTermination(taskId,
TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
assertTaskKillWaitState();
}
@@ -626,9 +625,9 @@ public class TestTaskImpl {
@Test
public void testDiagnostics_KillNew(){
TezTaskID taskId = getNewTaskID();
- mockTask.handle(new TaskEventTermination(taskId,
TaskTerminationCause.DAG_KILL));
+ mockTask.handle(new TaskEventTermination(taskId,
TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null));
assertEquals(1, mockTask.getDiagnostics().size());
-
assertTrue(mockTask.getDiagnostics().get(0).contains(TaskTerminationCause.DAG_KILL.name()));
+
assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_BY_CLIENT.name()));
assertEquals(0, mockTask.taskStartedEventLogged);
assertEquals(1, mockTask.taskFinishedEventLogged);
}
@@ -637,9 +636,9 @@ public class TestTaskImpl {
public void testDiagnostics_Kill(){
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
- mockTask.handle(new TaskEventTermination(taskId,
TaskTerminationCause.OTHER_TASK_FAILURE));
+ mockTask.handle(new TaskEventTermination(taskId,
TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
assertEquals(1, mockTask.getDiagnostics().size());
-
assertTrue(mockTask.getDiagnostics().get(0).contains(TaskTerminationCause.OTHER_TASK_FAILURE.name()));
+
assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN.name()));
}
// TODO Add test to validate the correct commit attempt.