TEZ-3161. Allow task to report different kinds of errors - fatal / kill (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/27a13fc9 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/27a13fc9 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/27a13fc9 Branch: refs/heads/master Commit: 27a13fc9761784c37af8adfaeb9337a4abae7182 Parents: 5ce07f8 Author: Siddharth Seth <[email protected]> Authored: Tue Apr 5 20:41:26 2016 -0700 Committer: Siddharth Seth <[email protected]> Committed: Tue Apr 5 20:41:26 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/common/ATSConstants.java | 1 + .../org/apache/tez/runtime/api/TaskContext.java | 34 ++- .../apache/tez/runtime/api/TaskFailureType.java | 31 +++ tez-dag/pom.xml | 1 + .../tez/dag/app/TaskCommunicatorManager.java | 26 ++- .../tez/dag/app/TaskHeartbeatHandler.java | 3 +- .../event/TaskAttemptEventAttemptFailed.java | 27 ++- .../dag/app/dag/event/TaskEventTAFailed.java | 46 ++++ .../dag/app/dag/event/TaskEventTAKilled.java | 37 +++ .../dag/app/dag/event/TaskEventTALaunched.java | 29 +++ .../dag/app/dag/event/TaskEventTASucceeded.java | 29 +++ .../dag/app/dag/event/TaskEventTAUpdate.java | 16 +- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 64 ++++-- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 21 +- .../events/TaskAttemptFinishedEvent.java | 17 ++ .../impl/HistoryEventJsonConversion.java | 3 + .../api/TaskCommunicatorContext.java | 4 +- tez-dag/src/main/proto/HistoryEvents.proto | 2 + .../apache/tez/dag/app/TestRecoveryParser.java | 2 +- .../dag/app/TestTaskCommunicatorManager2.java | 227 +++++++++++++++---- .../tez/dag/app/dag/impl/TestDAGRecovery.java | 21 +- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 184 +++++++++++++-- .../tez/dag/app/dag/impl/TestTaskImpl.java | 184 ++++++++++----- .../tez/dag/app/dag/impl/TestVertexImpl.java | 18 +- .../TestHistoryEventsProtoConversion.java | 61 ++++- .../impl/TestHistoryEventJsonConversion.java | 2 +- .../org/apache/tez/mapreduce/TestUmbilical.java | 18 +- .../ats/HistoryEventTimelineConversion.java | 3 + .../ats/TestHistoryEventTimelineConversion.java | 20 +- tez-runtime-internals/pom.xml | 2 +- .../apache/tez/common/TezConverterUtils.java | 30 ++- .../runtime/LogicalIOProcessorRuntimeTask.java | 10 +- .../org/apache/tez/runtime/RuntimeTask.java | 18 +- .../api/events/TaskAttemptFailedEvent.java | 9 +- .../api/events/TaskAttemptKilledEvent.java | 34 +++ .../apache/tez/runtime/api/impl/EventType.java | 1 + .../apache/tez/runtime/api/impl/TezEvent.java | 26 ++- .../runtime/api/impl/TezInputContextImpl.java | 13 ++ .../runtime/api/impl/TezOutputContextImpl.java | 13 ++ .../api/impl/TezProcessorContextImpl.java | 15 +- .../runtime/api/impl/TezTaskContextImpl.java | 17 +- .../tez/runtime/api/impl/TezUmbilical.java | 13 +- .../internals/api/TaskReporterInterface.java | 9 +- .../org/apache/tez/runtime/task/EndReason.java | 3 +- .../apache/tez/runtime/task/TaskReporter.java | 45 ++-- .../tez/runtime/task/TaskRunner2Callable.java | 13 +- .../tez/runtime/task/TaskRunner2Result.java | 26 ++- .../apache/tez/runtime/task/TezTaskRunner2.java | 206 ++++++++++------- .../src/main/proto/Events.proto | 28 --- .../src/main/proto/RuntimeEvents.proto | 38 ++++ .../runtime/task/TaskExecutionTestHelpers.java | 118 ++++++++-- .../tez/runtime/task/TestTaskExecution2.java | 156 +++++++++++-- .../common/shuffle/impl/ShuffleManager.java | 7 +- .../common/shuffle/orderedgrouped/Shuffle.java | 3 +- .../writers/UnorderedPartitionedKVWriter.java | 7 +- .../impl/TestShuffleInputEventHandlerImpl.java | 9 +- .../shuffle/orderedgrouped/TestShuffle.java | 4 +- .../TestUnorderedPartitionedKVWriter.java | 7 +- .../processor/FilterByWordInputProcessor.java | 3 +- .../java/org/apache/tez/test/TestInput.java | 3 +- .../java/org/apache/tez/test/TestProcessor.java | 3 +- .../tez/test/TestTaskErrorsUsingLocalMode.java | 215 ++++++++++++++++++ 63 files changed, 1791 insertions(+), 445 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 41752a9..415a246 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-3029. Add an onError method to service plugin contexts. ALL CHANGES: + TEZ-3161. Allow task to report different kinds of errors - fatal / kill. TEZ-3177. Non-DAG events should use the session domain or no domain if the data does not need protection. TEZ-3192. IFile#checkState creating unnecessary objects though auto-boxing TEZ-3173. Update Tez AM REST APIs for more information for each vertex. http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java index 7204943..0b8c67d 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java +++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java @@ -69,6 +69,7 @@ public class ATSConstants { public static final String FINISH_TIME = "endTime"; public static final String TIME_TAKEN = "timeTaken"; public static final String STATUS = "status"; + public static final String TASK_FAILURE_TYPE = "taskFailureType"; public static final String TASK_ATTEMPT_ERROR_ENUM = "taskAttemptErrorEnum"; public static final String DIAGNOSTICS = "diagnostics"; public static final String SUCCESSFUL_ATTEMPT_ID = "successfulAttemptId"; http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java index 457b0de..b5e42bc 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java @@ -23,7 +23,9 @@ import java.util.List; import javax.annotation.Nullable; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.UserPayload; @@ -132,13 +134,41 @@ public interface TaskContext { public void notifyProgress(); /** - * Report a fatal error to the framework. This will cause the entire task to - * fail and should not be used for reporting temporary or recoverable errors + * Report an error to the framework. This will cause the taskAttempt to fail, and should not be used + * to report errors which can be handled locally in the TaskAttempt. A new TaskAttempt will be launched + * depending upon how many retries are available for the task. + * + * @deprecated Replaced by {@link #reportFailure(TaskFailureType, Throwable, String)} (FailureType, Throwable, String)} + * + * Note: To maintain compatibility, even though this method is named 'fatalError' - this method + * operates as {@link #reportFailure(TaskFailureType, Throwable, String)} + * with the TaskFailureType set to {@link TaskFailureType#NON_FATAL}. * * @param exception an exception representing the error + * @param message a diagnostic message which may be associated with the error */ + @Deprecated public void fatalError(@Nullable Throwable exception, @Nullable String message); + + /** + * Report an error to the framework. This will cause the entire task to be terminated. + * + * @param taskFailureType the type of the error + * @param exception any exception that may be associated with the error + * @param message a diagnostic message which may be associated with the error + */ + void reportFailure(TaskFailureType taskFailureType, @Nullable Throwable exception, @Nullable String message); + + /** + * Kill the currently running attempt. + * @param exception an associated exception + * @param message an associated diagnostic message + */ + @Private + @Unstable + void killSelf(@Nullable Throwable exception, @Nullable String message); + /** * Returns meta-data for the specified service. As an example, when the MR * ShuffleHandler is used - this would return the jobToken serialized as bytes http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-api/src/main/java/org/apache/tez/runtime/api/TaskFailureType.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskFailureType.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskFailureType.java new file mode 100644 index 0000000..41a665f --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskFailureType.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.api; + +public enum TaskFailureType { + /** + * Indicates an error, which can potentially be recovered from when another attempt is launched + */ + NON_FATAL, + + /** + * Indicates an error which is fatal; no more attempts will be made for the task + */ + FATAL, +} http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/pom.xml ---------------------------------------------------------------------- diff --git a/tez-dag/pom.xml b/tez-dag/pom.xml index 71385b0..45a3c3f 100644 --- a/tez-dag/pom.xml +++ b/tez-dag/pom.xml @@ -200,6 +200,7 @@ <imports> <param>${basedir}/src/main/proto</param> <param>${basedir}/../tez-api/src/main/proto</param> + <param>${basedir}/../tez-runtime-internals/src/main/proto</param> </imports> <source> <directory>${basedir}/src/main/proto</directory> http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java index 403e1a1..36b74de 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java @@ -33,6 +33,8 @@ import org.apache.commons.collections4.ListUtils; import org.apache.hadoop.yarn.event.Event; import org.apache.tez.Utils; import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.runtime.api.TaskFailureType; +import org.apache.tez.runtime.api.events.TaskAttemptKilledEvent; import org.apache.tez.serviceplugins.api.DagInfo; import org.apache.tez.serviceplugins.api.ServicePluginError; import org.apache.tez.serviceplugins.api.TaskCommunicator; @@ -277,7 +279,8 @@ public class TaskCommunicatorManager extends AbstractService implements taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID, (TaskStatusUpdateEvent) tezEvent.getEvent()); } else if (eventType == EventType.TASK_ATTEMPT_COMPLETED_EVENT - || eventType == EventType.TASK_ATTEMPT_FAILED_EVENT) { + || eventType == EventType.TASK_ATTEMPT_FAILED_EVENT + || eventType == EventType.TASK_ATTEMPT_KILLED_EVENT) { taFinishedEvents.add(tezEvent); } else { if (eventType == EventType.INPUT_READ_ERROR_EVENT) { @@ -306,6 +309,7 @@ public class TaskCommunicatorManager extends AbstractService implements EventMetaData sourceMeta = e.getSourceInfo(); switch (e.getEventType()) { case TASK_ATTEMPT_FAILED_EVENT: + case TASK_ATTEMPT_KILLED_EVENT: TaskAttemptTerminationCause errCause = null; switch (sourceMeta.getEventGenerator()) { case INPUT: @@ -324,12 +328,19 @@ public class TaskCommunicatorManager extends AbstractService implements throw new TezUncheckedException("Unknown EventProducerConsumerType: " + sourceMeta.getEventGenerator()); } - TaskAttemptFailedEvent taskFailedEvent =(TaskAttemptFailedEvent) e.getEvent(); - sendEvent( - new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(), - TaskAttemptEventType.TA_FAILED, - "Error: " + taskFailedEvent.getDiagnostics(), + if (e.getEventType() == EventType.TASK_ATTEMPT_FAILED_EVENT) { + TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) e.getEvent(); + sendEvent( + new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(), + TaskAttemptEventType.TA_FAILED, taskFailedEvent.getTaskFailureType(), + "Error: " + taskFailedEvent.getDiagnostics(), errCause)); + } else { // Killed + TaskAttemptKilledEvent taskKilledEvent = (TaskAttemptKilledEvent) e.getEvent(); + sendEvent( + new TaskAttemptEventAttemptKilled(sourceMeta.getTaskAttemptID(), + "Error: " + taskKilledEvent.getDiagnostics(), errCause)); + } break; case TASK_ATTEMPT_COMPLETED_EVENT: sendEvent( @@ -387,8 +398,9 @@ public class TaskCommunicatorManager extends AbstractService implements // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore, // instead of waiting for the unregister to flow through the Container. // Fix along the same lines as TEZ-2124 by introducing an explict context. + //TODO-3183. Allow the FailureType to be specified sendEvent(new TaskAttemptEventAttemptFailed(taskAttemptId, - TaskAttemptEventType.TA_FAILED, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason( + TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason( taskAttemptEndReason))); } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java index 2f03474..296073a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java @@ -25,6 +25,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.TaskFailureType; /** @@ -60,7 +61,7 @@ public class TaskHeartbeatHandler extends HeartbeatHandlerBase<TezTaskAttemptID> @Override protected void handleTimeOut(TezTaskAttemptID attemptId) { eventHandler.handle(new TaskAttemptEventAttemptFailed(attemptId, - TaskAttemptEventType.TA_TIMED_OUT, "AttemptID:" + attemptId.toString() + TaskAttemptEventType.TA_TIMED_OUT, TaskFailureType.NON_FATAL, "AttemptID:" + attemptId.toString() + " Timed out after " + timeOut / 1000 + " secs", TaskAttemptTerminationCause.TASK_HEARTBEAT_ERROR)); } } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java index 21c6b14..299847c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java @@ -18,29 +18,36 @@ package org.apache.tez.dag.app.dag.event; +import com.google.common.base.Preconditions; import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.TaskFailureType; public class TaskAttemptEventAttemptFailed extends TaskAttemptEvent implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent, RecoveryEvent { private final String diagnostics; private final TaskAttemptTerminationCause errorCause; - private boolean isFromRecovery = false; + private final TaskFailureType taskFailureType; + private final boolean isFromRecovery; /* Accepted Types - FAILED, TIMED_OUT */ public TaskAttemptEventAttemptFailed(TezTaskAttemptID id, - TaskAttemptEventType type, String diagnostics, TaskAttemptTerminationCause errorCause) { - super(id, type); - this.diagnostics = diagnostics; - this.errorCause = errorCause; + TaskAttemptEventType type, TaskFailureType taskFailureType, + String diagnostics, + TaskAttemptTerminationCause errorCause) { + this(id, type, taskFailureType, diagnostics, errorCause, false); } /* Accepted Types - FAILED, TIMED_OUT */ public TaskAttemptEventAttemptFailed(TezTaskAttemptID id, - TaskAttemptEventType type, String diagnostics, TaskAttemptTerminationCause errorCause, - boolean isFromRecovery) { - this(id, type, diagnostics, errorCause); + TaskAttemptEventType type, TaskFailureType taskFailureType, String diagnostics, TaskAttemptTerminationCause errorCause, + boolean isFromRecovery) { + super(id, type); + Preconditions.checkNotNull(taskFailureType, "FailureType must be set for a FAILED task attempt"); + this.diagnostics = diagnostics; + this.errorCause = errorCause; + this.taskFailureType = taskFailureType; this.isFromRecovery = isFromRecovery; } @@ -58,4 +65,8 @@ public class TaskAttemptEventAttemptFailed extends TaskAttemptEvent public boolean isFromRecovery() { return isFromRecovery; } + + public TaskFailureType getTaskFailureType() { + return taskFailureType; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAFailed.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAFailed.java new file mode 100644 index 0000000..d6f1526 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAFailed.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.event; + +import com.google.common.base.Preconditions; +import org.apache.tez.common.TezAbstractEvent; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.TaskFailureType; + +@SuppressWarnings("rawtypes") +public class TaskEventTAFailed extends TaskEventTAUpdate { + + private final TezAbstractEvent causalEvent; + private final TaskFailureType taskFailureType; + + public TaskEventTAFailed(TezTaskAttemptID id, TaskFailureType taskFailureType, TezAbstractEvent causalEvent) { + super(id, TaskEventType.T_ATTEMPT_FAILED); + Preconditions.checkNotNull(taskFailureType, "FailureType must be specified for a failed attempt"); + this.taskFailureType = taskFailureType; + this.causalEvent = causalEvent; + } + + public TezAbstractEvent getCausalEvent() { + return causalEvent; + } + + public TaskFailureType getTaskFailureType() { + return taskFailureType; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAKilled.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAKilled.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAKilled.java new file mode 100644 index 0000000..c410361 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAKilled.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.event; + +import org.apache.tez.common.TezAbstractEvent; +import org.apache.tez.dag.records.TezTaskAttemptID; + +@SuppressWarnings("rawtypes") +public class TaskEventTAKilled extends TaskEventTAUpdate { + + private final TezAbstractEvent causalEvent; + + public TaskEventTAKilled(TezTaskAttemptID id, TezAbstractEvent causalEvent) { + super(id, TaskEventType.T_ATTEMPT_KILLED); + this.causalEvent = causalEvent; + } + + public TezAbstractEvent getCausalEvent() { + return causalEvent; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTALaunched.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTALaunched.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTALaunched.java new file mode 100644 index 0000000..a9d46c5 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTALaunched.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.event; + +import org.apache.tez.dag.records.TezTaskAttemptID; + +@SuppressWarnings("rawtypes") +public class TaskEventTALaunched extends TaskEventTAUpdate { + + public TaskEventTALaunched(TezTaskAttemptID id) { + super(id, TaskEventType.T_ATTEMPT_LAUNCHED); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTASucceeded.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTASucceeded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTASucceeded.java new file mode 100644 index 0000000..e74e2bf --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTASucceeded.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.event; + +import org.apache.tez.dag.records.TezTaskAttemptID; + +@SuppressWarnings("rawtypes") +public class TaskEventTASucceeded extends TaskEventTAUpdate { + + public TaskEventTASucceeded(TezTaskAttemptID id) { + super(id, TaskEventType.T_ATTEMPT_SUCCEEDED); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java index 01eaf5b..346c9d1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java @@ -18,31 +18,19 @@ package org.apache.tez.dag.app.dag.event; -import org.apache.tez.common.TezAbstractEvent; import org.apache.tez.dag.records.TezTaskAttemptID; @SuppressWarnings("rawtypes") -public class TaskEventTAUpdate extends TaskEvent { +public abstract class TaskEventTAUpdate extends TaskEvent { private TezTaskAttemptID attemptID; - private TezAbstractEvent causalEvent; public TaskEventTAUpdate(TezTaskAttemptID id, TaskEventType type) { - this(id, type, null); - } - - public TaskEventTAUpdate(TezTaskAttemptID id, TaskEventType type, TezAbstractEvent causalEvent) { super(id.getTaskID(), type); this.attemptID = id; - this.causalEvent = causalEvent; } - + public TezTaskAttemptID getTaskAttemptID() { return attemptID; } - - public TezAbstractEvent getCausalEvent() { - return causalEvent; - } - } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 702c323..6169a7b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -31,6 +31,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tez.dag.app.dag.event.TaskEvent; +import org.apache.tez.dag.app.dag.event.TaskEventTAFailed; +import org.apache.tez.dag.app.dag.event.TaskEventTAKilled; +import org.apache.tez.dag.app.dag.event.TaskEventTALaunched; +import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded; +import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -86,8 +92,6 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; -import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; -import org.apache.tez.dag.app.dag.event.TaskEventType; import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate; import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded; @@ -1058,7 +1062,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent( attemptId, getVertex().getName(), getLaunchTime(), - getFinishTime(), TaskAttemptState.SUCCEEDED, null, + getFinishTime(), TaskAttemptState.SUCCEEDED, null, null, "", getCounters(), lastDataEvents, taGeneratedEvents, creationTime, creationCausalTA, allocationTime, null, null, null, null, null); @@ -1068,10 +1072,13 @@ public class TaskAttemptImpl implements TaskAttempt, } protected void logJobHistoryAttemptUnsuccesfulCompletion( - TaskAttemptState state) { + TaskAttemptState state, TaskFailureType taskFailureType) { Preconditions.checkArgument(recoveryData == null || recoveryData.getTaskAttemptFinishedEvent() == null, "log TaskAttemptFinishedEvent again in recovery when there's already another TaskAtttemptFinishedEvent"); + if (state == TaskAttemptState.FAILED && taskFailureType == null) { + throw new IllegalStateException("FAILED state must be accompanied by a FailureType"); + } long finishTime = getFinishTime(); ContainerId unsuccessfulContainerId = null; NodeId unsuccessfulContainerNodeId = null; @@ -1087,6 +1094,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent( attemptId, getVertex().getName(), getLaunchTime(), finishTime, state, + taskFailureType, terminationCause, StringUtils.join( getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents, @@ -1169,6 +1177,7 @@ public class TaskAttemptImpl implements TaskAttempt, + ", attemptId=" + ta.attemptId); } ta.sendEvent(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, + taFinishedEvent.getTaskFailureType(), taFinishedEvent.getDiagnostics(), taFinishedEvent.getTaskAttemptError(), true)); break; case KILLED: @@ -1210,7 +1219,7 @@ public class TaskAttemptImpl implements TaskAttempt, LOG.error(msg, e); String diag = msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause()); new TerminateTransition(FAILED_HELPER).transition(ta, - new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, diag, + new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, diag, TaskAttemptTerminationCause.APPLICATION_ERROR)); return TaskAttemptStateInternal.FAILED; } @@ -1325,7 +1334,7 @@ public class TaskAttemptImpl implements TaskAttempt, ta.recoveryData.getTaskAttemptFinishedEvent() == null) { ta.setFinishTime(); ta.logJobHistoryAttemptUnsuccesfulCompletion(helper - .getTaskAttemptState()); + .getTaskAttemptState(), helper.getFailureType(event)); } else { ta.finishTime = ta.recoveryData.getTaskAttemptFinishedEvent().getFinishTime(); } @@ -1341,8 +1350,7 @@ public class TaskAttemptImpl implements TaskAttempt, ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta, helper.getTaskAttemptState())); // Send out events to the Task - indicating TaskAttemptTermination(F/K) - ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, helper - .getTaskEventType(), event)); + ta.sendEvent(helper.getTaskEvent(ta.attemptId, event)); } } @@ -1393,8 +1401,7 @@ public class TaskAttemptImpl implements TaskAttempt, } // Inform the Task - ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, - TaskEventType.T_ATTEMPT_LAUNCHED)); + ta.sendEvent(new TaskEventTALaunched(ta.attemptId)); if (ta.isSpeculationEnabled()) { ta.sendEvent(new SpeculatorEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.RUNNING, @@ -1503,6 +1510,7 @@ public class TaskAttemptImpl implements TaskAttempt, ta.sendEvent( new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, + TaskFailureType.NON_FATAL, diagnostics, TaskAttemptTerminationCause.NO_PROGRESS) ); @@ -1567,8 +1575,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptState.SUCCEEDED, null, null, ta.getVertex().getTaskSchedulerIdentifier())); // Inform the task. - ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, - TaskEventType.T_ATTEMPT_SUCCEEDED)); + ta.sendEvent(new TaskEventTASucceeded(ta.attemptId)); // Unregister from the TaskHeartbeatHandler. ta.taskHeartbeatHandler.unregister(ta.attemptId); @@ -1788,11 +1795,13 @@ public class TaskAttemptImpl implements TaskAttempt, protected interface TerminatedTransitionHelper { - public TaskAttemptStateInternal getTaskAttemptStateInternal(); + TaskAttemptStateInternal getTaskAttemptStateInternal(); + + TaskAttemptState getTaskAttemptState(); - public TaskAttemptState getTaskAttemptState(); + TaskEvent getTaskEvent(TezTaskAttemptID taskAttemptId, TaskAttemptEvent event); - public TaskEventType getTaskEventType(); + TaskFailureType getFailureType(TaskAttemptEvent event); } protected static class FailedTransitionHelper implements @@ -1807,8 +1816,19 @@ public class TaskAttemptImpl implements TaskAttempt, } @Override - public TaskEventType getTaskEventType() { - return TaskEventType.T_ATTEMPT_FAILED; + public TaskEvent getTaskEvent(TezTaskAttemptID taskAttemptId, + TaskAttemptEvent event) { + return new TaskEventTAFailed(taskAttemptId, getFailureType(event), event); + } + + @Override + public TaskFailureType getFailureType(TaskAttemptEvent event) { + if (event instanceof TaskAttemptEventAttemptFailed) { + return ( ((TaskAttemptEventAttemptFailed) event).getTaskFailureType()); + } else { + // For alternate failure scenarios like OUTPUT_FAILED, CONTAINER_TERMINATING, NODE_FAILED, etc + return TaskFailureType.NON_FATAL; + } } } @@ -1826,8 +1846,14 @@ public class TaskAttemptImpl implements TaskAttempt, } @Override - public TaskEventType getTaskEventType() { - return TaskEventType.T_ATTEMPT_KILLED; + public TaskEvent getTaskEvent(TezTaskAttemptID taskAttemptId, + TaskAttemptEvent event) { + return new TaskEventTAKilled(taskAttemptId, event); + } + + @Override + public TaskFailureType getFailureType(TaskAttemptEvent event) { + return null; } } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 9217e84..26ba004 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -37,6 +37,8 @@ import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.tez.dag.app.dag.event.TaskEventTAFailed; +import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -1166,7 +1168,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { public TaskStateInternal transition(TaskImpl task, TaskEvent event) { task.failedAttempts++; task.getVertex().incrementFailedTaskAttemptCount(); - TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event; + TaskEventTAFailed castEvent = (TaskEventTAFailed) event; schedulingCausalTA = castEvent.getTaskAttemptID(); task.addDiagnosticInfo("TaskAttempt " + castEvent.getTaskAttemptID().getId() + " failed," + " info=" + task.getAttempt(castEvent.getTaskAttemptID()).getDiagnostics()); @@ -1177,7 +1179,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { // The attempt would have informed the scheduler about it's failure task.taskAttemptStatus.put(castEvent.getTaskAttemptID().getId(), true); - if (task.failedAttempts < task.maxFailedAttempts) { + if (task.failedAttempts < task.maxFailedAttempts && + castEvent.getTaskFailureType() == TaskFailureType.NON_FATAL) { task.handleTaskAttemptCompletion( ((TaskEventTAUpdate) event).getTaskAttemptID(), TaskAttemptStateInternal.FAILED); @@ -1189,9 +1192,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { task.addAndScheduleAttempt(getSchedulingCausalTA()); } } else { - LOG.info("Failing task: " + task.getTaskId() - + ", currentFailedAttempts: " + task.failedAttempts + ", maxFailedAttempts: " - + task.maxFailedAttempts); + if (castEvent.getTaskFailureType() == TaskFailureType.NON_FATAL) { + LOG.info( + "Failing task: {} due to too many failed attempts. currentFailedAttempts={}, maxFailedAttempts={}", + task.getTaskId(), task.failedAttempts, task.maxFailedAttempts); + } else { + LOG.info( + "Failing task: {} due to {} error reported by TaskAttempt. CurrentFailedAttempts={}", + task.getTaskId(), TaskFailureType.FATAL, task.failedAttempts); + } task.handleTaskAttemptCompletion( ((TaskEventTAUpdate) event).getTaskAttemptID(), TaskAttemptStateInternal.FAILED); @@ -1225,7 +1234,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { task.internalError(event.getType()); } - TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event; + TaskEventTAFailed castEvent = (TaskEventTAFailed) event; TezTaskAttemptID failedAttemptId = castEvent.getTaskAttemptID(); TaskAttempt failedAttempt = task.getAttempt(failedAttemptId); ContainerId containerId = failedAttempt.getAssignedContainerID(); http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java index 8e31a25..5a9d8c9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java @@ -18,11 +18,14 @@ package org.apache.tez.dag.history.events; +import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.List; +import org.apache.tez.common.TezConverterUtils; +import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +60,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { private long finishTime; private TezTaskAttemptID creationCausalTA; private TaskAttemptState state; + private TaskFailureType taskFailureType; private String diagnostics; private TezCounters tezCounters; private TaskAttemptTerminationCause error; @@ -73,6 +77,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { long startTime, long finishTime, TaskAttemptState state, + @Nullable TaskFailureType taskFailureType, TaskAttemptTerminationCause error, String diagnostics, TezCounters counters, List<DataEventDependencyInfo> dataEvents, @@ -93,6 +98,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { this.startTime = startTime; this.finishTime = finishTime; this.state = state; + this.taskFailureType = taskFailureType; this.diagnostics = diagnostics; this.tezCounters = counters; this.error = error; @@ -136,6 +142,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { .setAllocationTime(allocationTime) .setStartTime(startTime) .setFinishTime(finishTime); + if (taskFailureType != null) { + builder.setTaskFailureType(TezConverterUtils.failureTypeToProto(taskFailureType)); + } if (creationCausalTA != null) { builder.setCreationCausalTA(creationCausalTA.toString()); } @@ -177,6 +186,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { this.allocationTime = proto.getAllocationTime(); this.startTime = proto.getStartTime(); this.finishTime = proto.getFinishTime(); + if (proto.hasTaskFailureType()) { + this.taskFailureType = TezConverterUtils.failureTypeFromProto(proto.getTaskFailureType()); + } if (proto.hasCreationCausalTA()) { this.creationCausalTA = TezTaskAttemptID.fromString(proto.getCreationCausalTA()); } @@ -244,6 +256,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { + ", finishTime=" + finishTime + ", timeTaken=" + (finishTime - startTime) + ", status=" + state.name() + + ", taskFailureType=" + taskFailureType + ", errorEnum=" + (error != null ? error.name() : "") + ", diagnostics=" + diagnostics + ", containerId=" + (containerId != null ? containerId.toString() : "") @@ -276,6 +289,10 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { return state; } + public TaskFailureType getTaskFailureType() { + return taskFailureType; + } + public long getStartTime() { return startTime; } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java index 9bca440..75116c8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java @@ -548,6 +548,9 @@ public class HistoryEventJsonConversion { if (event.getTaskAttemptError() != null) { otherInfo.put(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, event.getTaskAttemptError().name()); } + if (event.getTaskFailureType() != null) { + otherInfo.put(ATSConstants.TASK_FAILURE_TYPE, event.getTaskFailureType().name()); + } otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); otherInfo.put(ATSConstants.COUNTERS, DAGUtils.convertCountersToJSON(event.getCounters())); http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java index c551b09..4990d95 100644 --- a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java +++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java @@ -141,7 +141,9 @@ public interface TaskCommunicatorContext extends ServicePluginContextBase { @Nullable String diagnostics); /** - * Inform the framework that a task has failed + * Inform the framework that a task has failed. This, at the moment, is always treated as a + * an error which will cause a retry of the task to be triggered, if there are enough retry + * attempts left. * * @param taskAttemptId the relevant task attempt id * @param taskAttemptEndReason the reason for the task failure http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/proto/HistoryEvents.proto ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto index f3aeed4..ff3707d 100644 --- a/tez-dag/src/main/proto/HistoryEvents.proto +++ b/tez-dag/src/main/proto/HistoryEvents.proto @@ -22,6 +22,7 @@ option java_generate_equals_and_hash = true; import "DAGApiRecords.proto"; import "Events.proto"; +import "RuntimeEvents.proto"; message AMLaunchedProto { optional string application_attempt_id = 1; @@ -185,6 +186,7 @@ message TaskAttemptFinishedProto { optional int64 finish_time = 5; optional string creation_causal_t_a = 6; optional int32 state = 7; + optional TaskFailureTypeProto task_failure_type = 16; optional string diagnostics = 8; optional TezCountersProto counters = 9; optional string error_enum = 10; http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java index 12e75a7..962b230 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java @@ -702,7 +702,7 @@ public class TestRecoveryParser { rService.handle(new DAGHistoryEvent(dagID, ta0t2v2StartedEvent)); TaskAttemptFinishedEvent ta0t2v2FinishedEvent = new TaskAttemptFinishedEvent( ta0t2v2Id, "v1", 500L, 600L, - TaskAttemptState.SUCCEEDED, null, "", null, + TaskAttemptState.SUCCEEDED, null, null, "", null, null, null, 0L, null, 0L, null, null, null, null, null); rService.handle(new DAGHistoryEvent(dagID, ta0t2v2FinishedEvent)); http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java index 9700524..4950e09 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java @@ -17,14 +17,20 @@ package org.apache.tez.dag.app; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.io.IOException; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.Lists; @@ -38,12 +44,21 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.TaskFailureType; +import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; +import org.apache.tez.runtime.api.impl.EventMetaData; +import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; @@ -54,6 +69,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerTask; import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -62,59 +78,30 @@ public class TestTaskCommunicatorManager2 { @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testTaskAttemptFailedKilled() throws IOException, TezException { - ApplicationId appId = ApplicationId.newInstance(1000, 1); - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); - Credentials credentials = new Credentials(); - AppContext appContext = mock(AppContext.class); - EventHandler eventHandler = mock(EventHandler.class); - DAG dag = mock(DAG.class); - AMContainerMap amContainerMap = mock(AMContainerMap.class); - Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>(); - doReturn(eventHandler).when(appContext).getEventHandler(); - doReturn(dag).when(appContext).getCurrentDAG(); - doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); - doReturn(credentials).when(appContext).getAppCredentials(); - doReturn(appAcls).when(appContext).getApplicationACLs(); - doReturn(amContainerMap).when(appContext).getAllContainers(); - NodeId nodeId = NodeId.newInstance("localhost", 0); - AMContainer amContainer = mock(AMContainer.class); - Container container = mock(Container.class); - doReturn(nodeId).when(container).getNodeId(); - doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class)); - doReturn(container).when(amContainer).getContainer(); - Configuration conf = new TezConfiguration(); - UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); - TaskCommunicatorManager taskAttemptListener = - new TaskCommunicatorManager(appContext, mock(TaskHeartbeatHandler.class), - mock(ContainerHeartbeatHandler.class), Lists.newArrayList(new NamedEntityDescriptor( - TezConstants.getTezYarnServicePluginName(), null).setUserPayload(userPayload))); - - TaskSpec taskSpec1 = mock(TaskSpec.class); - TezTaskAttemptID taskAttemptId1 = mock(TezTaskAttemptID.class); - doReturn(taskAttemptId1).when(taskSpec1).getTaskAttemptID(); + TaskCommunicatorManagerWrapperForTest wrapper = new TaskCommunicatorManagerWrapperForTest(); + + TaskSpec taskSpec1 = wrapper.createTaskSpec(); AMContainerTask amContainerTask1 = new AMContainerTask(taskSpec1, null, null, false, 10); - TaskSpec taskSpec2 = mock(TaskSpec.class); - TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class); - doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID(); + TaskSpec taskSpec2 = wrapper.createTaskSpec(); AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec2, null, null, false, 10); - ContainerId containerId1 = createContainerId(appId, 1); - taskAttemptListener.registerRunningContainer(containerId1, 0); - taskAttemptListener.registerTaskAttempt(amContainerTask1, containerId1, 0); - ContainerId containerId2 = createContainerId(appId, 2); - taskAttemptListener.registerRunningContainer(containerId2, 0); - taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId2, 0); + ContainerId containerId1 = wrapper.createContainerId(1); + wrapper.registerRunningContainer(containerId1); + wrapper.registerTaskAttempt(containerId1, amContainerTask1); + ContainerId containerId2 = wrapper.createContainerId(2); + wrapper.registerRunningContainer(containerId2); + wrapper.registerTaskAttempt(containerId2, amContainerTask2); - taskAttemptListener - .taskFailed(taskAttemptId1, TaskAttemptEndReason.COMMUNICATION_ERROR, "Diagnostics1"); - taskAttemptListener - .taskKilled(taskAttemptId2, TaskAttemptEndReason.EXECUTOR_BUSY, "Diagnostics2"); + wrapper.getTaskCommunicatorManager().taskFailed(amContainerTask1.getTask().getTaskAttemptID(), + TaskAttemptEndReason.COMMUNICATION_ERROR, "Diagnostics1"); + wrapper.getTaskCommunicatorManager().taskKilled(amContainerTask2.getTask().getTaskAttemptID(), + TaskAttemptEndReason.EXECUTOR_BUSY, "Diagnostics2"); ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class); - verify(eventHandler, times(2)).handle(argumentCaptor.capture()); + verify(wrapper.getEventHandler(), times(2)).handle(argumentCaptor.capture()); assertTrue(argumentCaptor.getAllValues().get(0) instanceof TaskAttemptEventAttemptFailed); assertTrue(argumentCaptor.getAllValues().get(1) instanceof TaskAttemptEventAttemptKilled); TaskAttemptEventAttemptFailed failedEvent = @@ -128,13 +115,153 @@ public class TestTaskCommunicatorManager2 { assertEquals("Diagnostics2", killedEvent.getDiagnosticInfo()); assertEquals(TaskAttemptTerminationCause.SERVICE_BUSY, killedEvent.getTerminationCause()); - // TODO TEZ-2003. Verify unregistration from the registered list +// TODO TEZ-2003. Verify unregistration from the registered list } - @SuppressWarnings("deprecation") - private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) { - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1); - ContainerId containerId = ContainerId.newInstance(appAttemptId, containerIdx); - return containerId; + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testTaskAttemptFailureViaHeartbeatNonFatal() throws IOException, TezException { + + TaskCommunicatorManagerWrapperForTest wrapper = new TaskCommunicatorManagerWrapperForTest(); + + TaskSpec taskSpec1 = wrapper.createTaskSpec(); + AMContainerTask amContainerTask1 = new AMContainerTask(taskSpec1, null, null, false, 10); + + TaskSpec taskSpec2 = wrapper.createTaskSpec(); + AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec2, null, null, false, 10); + + ContainerId containerId1 = wrapper.createContainerId(1); + wrapper.registerRunningContainer(containerId1); + wrapper.registerTaskAttempt(containerId1, amContainerTask1); + + ContainerId containerId2 = wrapper.createContainerId(2); + wrapper.registerRunningContainer(containerId2); + wrapper.registerTaskAttempt(containerId2, amContainerTask2); + + List<TezEvent> events = new LinkedList<>(); + + EventMetaData sourceInfo1 = + new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, "testVertex", null, + taskSpec1.getTaskAttemptID()); + TaskAttemptFailedEvent failedEvent1 = new TaskAttemptFailedEvent("non-fatal test error", + TaskFailureType.NON_FATAL); + TezEvent failedEventT1 = new TezEvent(failedEvent1, sourceInfo1); + events.add(failedEventT1); + TaskHeartbeatRequest taskHeartbeatRequest1 = + new TaskHeartbeatRequest(containerId1.toString(), taskSpec1.getTaskAttemptID(), events, 0, + 0, 0); + wrapper.getTaskCommunicatorManager().heartbeat(taskHeartbeatRequest1); + + ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class); + verify(wrapper.getEventHandler(), times(1)).handle(argumentCaptor.capture()); + assertTrue(argumentCaptor.getAllValues().get(0) instanceof TaskAttemptEventAttemptFailed); + TaskAttemptEventAttemptFailed failedEvent = + (TaskAttemptEventAttemptFailed) argumentCaptor.getAllValues().get(0); + assertEquals(TaskFailureType.NON_FATAL, failedEvent.getTaskFailureType()); + assertTrue(failedEvent.getDiagnosticInfo().contains("non-fatal")); + + events.clear(); + reset(wrapper.getEventHandler()); + + EventMetaData sourceInfo2 = + new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, "testVertex", null, + taskSpec2.getTaskAttemptID()); + TaskAttemptFailedEvent failedEvent2 = new TaskAttemptFailedEvent("-fatal- test error", + TaskFailureType.FATAL); + TezEvent failedEventT2 = new TezEvent(failedEvent2, sourceInfo2); + events.add(failedEventT2); + TaskHeartbeatRequest taskHeartbeatRequest2 = + new TaskHeartbeatRequest(containerId2.toString(), taskSpec2.getTaskAttemptID(), events, 0, + 0, 0); + wrapper.getTaskCommunicatorManager().heartbeat(taskHeartbeatRequest2); + + argumentCaptor = ArgumentCaptor.forClass(Event.class); + verify(wrapper.getEventHandler(), times(1)).handle(argumentCaptor.capture()); + assertTrue(argumentCaptor.getAllValues().get(0) instanceof TaskAttemptEventAttemptFailed); + failedEvent = (TaskAttemptEventAttemptFailed) argumentCaptor.getAllValues().get(0); + assertEquals(TaskFailureType.FATAL, failedEvent.getTaskFailureType()); + assertTrue(failedEvent.getDiagnosticInfo().contains("-fatal-")); + } + + @SuppressWarnings("unchecked") + private static class TaskCommunicatorManagerWrapperForTest { + ApplicationId appId = ApplicationId.newInstance(1000, 1); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + Credentials credentials = new Credentials(); + AppContext appContext = mock(AppContext.class); + EventHandler eventHandler = mock(EventHandler.class); + DAG dag = mock(DAG.class); + Vertex vertex = mock(Vertex.class); + TezDAGID dagId; + TezVertexID vertexId; + AMContainerMap amContainerMap = mock(AMContainerMap.class); + Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>(); + Configuration conf = new TezConfiguration(); + UserPayload userPayload; + TaskCommunicatorManager taskCommunicatorManager; + private AtomicInteger taskIdCounter = new AtomicInteger(0); + + TaskCommunicatorManagerWrapperForTest() throws IOException, TezException { + dagId = TezDAGID.getInstance(appId, 1); + vertexId = TezVertexID.getInstance(dagId, 100); + doReturn(eventHandler).when(appContext).getEventHandler(); + doReturn(dag).when(appContext).getCurrentDAG(); + doReturn(vertex).when(dag).getVertex(eq(vertexId)); + doReturn(new TaskAttemptEventInfo(0, new LinkedList<TezEvent>(), 0)).when(vertex) + .getTaskAttemptTezEvents(any(TezTaskAttemptID.class), anyInt(), anyInt(), anyInt()); + doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); + doReturn(credentials).when(appContext).getAppCredentials(); + doReturn(appAcls).when(appContext).getApplicationACLs(); + doReturn(amContainerMap).when(appContext).getAllContainers(); + doReturn(new SystemClock()).when(appContext).getClock(); + + NodeId nodeId = NodeId.newInstance("localhost", 0); + AMContainer amContainer = mock(AMContainer.class); + Container container = mock(Container.class); + doReturn(nodeId).when(container).getNodeId(); + doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class)); + doReturn(container).when(amContainer).getContainer(); + + userPayload = TezUtils.createUserPayloadFromConf(conf); + + taskCommunicatorManager = + new TaskCommunicatorManager(appContext, mock(TaskHeartbeatHandler.class), + mock(ContainerHeartbeatHandler.class), Lists.newArrayList(new NamedEntityDescriptor( + TezConstants.getTezYarnServicePluginName(), null).setUserPayload(userPayload))); + } + + + TaskCommunicatorManager getTaskCommunicatorManager() { + return taskCommunicatorManager; + } + + EventHandler getEventHandler() { + return eventHandler; + } + + private void registerRunningContainer(ContainerId containerId) { + taskCommunicatorManager.registerRunningContainer(containerId, 0); + } + + private void registerTaskAttempt(ContainerId containerId, AMContainerTask amContainerTask) { + taskCommunicatorManager.registerTaskAttempt(amContainerTask, containerId, 0); + } + + private TaskSpec createTaskSpec() { + TaskSpec taskSpec = mock(TaskSpec.class); + TezTaskID taskId = TezTaskID.getInstance(vertexId, taskIdCounter.incrementAndGet()); + TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0); + doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID(); + return taskSpec; + } + + + @SuppressWarnings("deprecation") + private ContainerId createContainerId(int id) { + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + ContainerId containerId = ContainerId.newInstance(appAttemptId, id); + return containerId; + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java index 0b0af7b..be1821b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java @@ -87,16 +87,12 @@ import org.apache.tez.dag.app.dag.event.DAGEvent; import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent; import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEvent; -import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask; import org.apache.tez.dag.app.dag.event.TaskEventType; import org.apache.tez.dag.app.dag.event.VertexEvent; -import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; import org.apache.tez.dag.app.dag.event.VertexEventType; import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter; -import org.apache.tez.dag.app.dag.impl.TestVertexImpl.EventHandlingRootInputInitializer; import org.apache.tez.dag.app.rm.AMSchedulerEvent; import org.apache.tez.dag.app.rm.AMSchedulerEventType; import org.apache.tez.dag.app.rm.TaskSchedulerManager; @@ -105,7 +101,6 @@ import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventHandler; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.events.DAGInitializedEvent; -import org.apache.tez.dag.history.events.DAGRecoveredEvent; import org.apache.tez.dag.history.events.DAGStartedEvent; import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; @@ -121,6 +116,7 @@ import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.InputInitializer; import org.apache.tez.runtime.api.InputInitializerContext; import org.apache.tez.runtime.api.InputSpecUpdate; @@ -130,7 +126,6 @@ import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputInitializerEvent; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; -import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; import org.junit.After; import org.junit.Assert; @@ -883,7 +878,7 @@ public class TestDAGRecovery { taGeneratedEvents.add(new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), metadata)); TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent( ta1t1v1Id, "v1", 0L, 0L, - TaskAttemptState.SUCCEEDED, null, "", null, + TaskAttemptState.SUCCEEDED, null, null, "", null, null, taGeneratedEvents, 0L, null, 0L, null, null, null, null, null); TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent); Map<TezTaskAttemptID, TaskAttemptRecoveryData> taRecoveryDataMap = @@ -941,7 +936,7 @@ public class TestDAGRecovery { initMockDAGRecoveryDataForTaskAttempt(); TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent( ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, - TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED, "", null, + TaskAttemptState.FAILED, TaskFailureType.NON_FATAL, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED, "", null, null, null, 0L, null, 0L, null, null, null, null, null); TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(null, taFinishedEvent); doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id); @@ -970,7 +965,7 @@ public class TestDAGRecovery { initMockDAGRecoveryDataForTaskAttempt(); TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent( ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, - TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null, + TaskAttemptState.KILLED, null, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null, null, null, 0L, null, 0L, null, null, null, null, null); TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(null, taFinishedEvent); doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id); @@ -1030,7 +1025,7 @@ public class TestDAGRecovery { sourceInfo)); TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent( ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, - TaskAttemptState.SUCCEEDED, null, "", null, + TaskAttemptState.SUCCEEDED, null, null, "", null, null, taGeneratedEvents, 0L, null, 0L, null, null, null, null, null); TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent); doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id); @@ -1068,7 +1063,7 @@ public class TestDAGRecovery { taGeneratedEvents.add(new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), metadata)); TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent( ta1t1v2Id, "vertex2", ta1LaunchTime, ta1FinishedTime, - TaskAttemptState.SUCCEEDED, null, "", null, + TaskAttemptState.SUCCEEDED, null, null, "", null, null, taGeneratedEvents, 0L, null, 0L, null, null, null, null, null); TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent); doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v2Id); @@ -1119,7 +1114,7 @@ public class TestDAGRecovery { mock(NodeId.class), "", "", ""); TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent( ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, - TaskAttemptState.FAILED, TaskAttemptTerminationCause.INPUT_READ_ERROR, "", null, + TaskAttemptState.FAILED, TaskFailureType.NON_FATAL, TaskAttemptTerminationCause.INPUT_READ_ERROR, "", null, null, null, 0L, null, 0L, null, null, null, null, null); TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent); doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id); @@ -1150,7 +1145,7 @@ public class TestDAGRecovery { mock(NodeId.class), "", "", ""); TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent( ta1t1v1Id, "v1", ta1FinishedTime, ta1FinishedTime, - TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null, + TaskAttemptState.KILLED, null, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null, null, null, 0L, null, 0L, null, null, null, null, null); TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent); doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id); http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index feb7585..68cfc39 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -61,6 +61,10 @@ import org.apache.hadoop.yarn.util.SystemClock; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.tez.common.MockDNSToSwitchMapping; +import org.apache.tez.dag.app.dag.event.TaskEventTAFailed; +import org.apache.tez.dag.app.dag.event.TaskEventTAKilled; +import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.serviceplugins.api.TaskCommunicator; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.TezConfiguration; @@ -117,10 +121,14 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @SuppressWarnings({ "unchecked", "rawtypes" }) public class TestTaskAttempt { + private static final Logger LOG = LoggerFactory.getLogger(TestTaskAttempt.class); + static public class StubbedFS extends RawLocalFileSystem { @Override public FileStatus getFileStatus(Path f) throws IOException { @@ -417,9 +425,11 @@ public class TestTaskAttempt { arg = ArgumentCaptor.forClass(Event.class); verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); - verifyEventType( + Event event = verifyEventType( arg.getAllValues().subList(expectedEventsAtRunning, - expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1); + expectedEvenstAfterTerminating), TaskEventTAFailed.class, 1); + TaskEventTAFailed failedEvent = (TaskEventTAFailed) event; + assertEquals(TaskFailureType.NON_FATAL, failedEvent.getTaskFailureType()); verifyEventType( arg.getAllValues().subList(expectedEventsAtRunning, expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); @@ -453,7 +463,7 @@ public class TestTaskAttempt { TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); - MockEventHandler eventHandler = new MockEventHandler(); + MockEventHandler eventHandler = spy(new MockEventHandler()); TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); @@ -486,10 +496,13 @@ public class TestTaskAttempt { mockHeartbeatHandler, appCtx, false, resource, createFakeContainerContext(), false); TezTaskAttemptID taskAttemptID = taImpl.getID(); + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); // At state STARTING. taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, null)); + int expectedEventsAtRunning = 3; + verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); assertEquals("Task attempt is not in running state", taImpl.getState(), TaskAttemptState.RUNNING); verify(mockHeartbeatHandler).register(taskAttemptID); @@ -503,6 +516,30 @@ public class TestTaskAttempt { assertEquals("Terminated", taImpl.getDiagnostics().get(0)); assertEquals(TaskAttemptTerminationCause.CONTAINER_EXITED, taImpl.getTerminationCause()); // TODO Ensure TA_TERMINATING after this is ingored. + + int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3; + arg = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); + + Event event = verifyEventType( + arg.getAllValues().subList(expectedEventsAtRunning, + expectedEvenstAfterTerminating), TaskEventTAFailed.class, 1); + TaskEventTAFailed failedEvent = (TaskEventTAFailed) event; + assertEquals(TaskFailureType.NON_FATAL, failedEvent.getTaskFailureType()); + verifyEventType( + arg.getAllValues().subList(expectedEventsAtRunning, + expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); + verifyEventType( + arg.getAllValues().subList(expectedEventsAtRunning, + expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1); + + taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID, + "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED)); + // verify unregister is not invoked again + verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID); + int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0; + arg = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture()); } @Test(timeout = 5000) @@ -575,7 +612,7 @@ public class TestTaskAttempt { verifyEventType( arg.getAllValues().subList(expectedEventsAtRunning, - expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1); + expectedEvenstAfterTerminating), TaskEventTASucceeded.class, 1); verifyEventType( arg.getAllValues().subList(expectedEventsAtRunning, expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); @@ -744,7 +781,116 @@ public class TestTaskAttempt { taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); - taImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptID, TaskAttemptEventType.TA_FAILED, "0", + taImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptID, TaskAttemptEventType.TA_FAILED, + TaskFailureType.NON_FATAL, "0", + TaskAttemptTerminationCause.APPLICATION_ERROR)); + + assertEquals("Task attempt is not in the FAIL_IN_PROGRESS state", taImpl.getInternalState(), + TaskAttemptStateInternal.FAIL_IN_PROGRESS); + verify(mockHeartbeatHandler).unregister(taskAttemptID); + assertEquals(1, taImpl.getDiagnostics().size()); + assertEquals("0", taImpl.getDiagnostics().get(0)); + assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause()); + + assertEquals(TaskAttemptStateInternal.FAIL_IN_PROGRESS, taImpl.getInternalState()); + taImpl.handle(new TaskAttemptEventTezEventUpdate(taImpl.getID(), Collections.EMPTY_LIST)); + assertFalse( + "InternalError occurred trying to handle TA_TEZ_EVENT_UPDATE in FAIL_IN_PROGRESS state", + eventHandler.internalError); + + taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID, "1", + TaskAttemptTerminationCause.CONTAINER_EXITED)); + // verify unregister is not invoked again + verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID); + assertEquals(2, taImpl.getDiagnostics().size()); + assertEquals("1", taImpl.getDiagnostics().get(1)); + // err cause does not change + assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause()); + + int expectedEvenstAfterTerminating = expectedEventsAtRunning + 5; + arg = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); + + + Event e = verifyEventType( + arg.getAllValues().subList(expectedEventsAtRunning, + expectedEvenstAfterTerminating), TaskEventTAFailed.class, 1); + TaskEventTAFailed failedEvent = (TaskEventTAFailed) e; + assertEquals(TaskFailureType.NON_FATAL, failedEvent.getTaskFailureType()); + verifyEventType( + arg.getAllValues().subList(expectedEventsAtRunning, + expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); + verifyEventType( + arg.getAllValues().subList(expectedEventsAtRunning, + expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1); + verifyEventType( + arg.getAllValues().subList(expectedEventsAtRunning, + expectedEvenstAfterTerminating), SpeculatorEventTaskAttemptStatusUpdate.class, 2); + } + + @Test(timeout = 5000) + public void testFailureFatalError() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + appId, 0); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); + TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); + + MockEventHandler eventHandler = spy(new MockEventHandler()); + TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); + + Configuration taskConf = new Configuration(); + taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + taskConf.setBoolean("fs.file.impl.disable.cache", true); + taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true); + + locationHint = TaskLocationHint.createTaskLocationHint( + new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); + Resource resource = Resource.newInstance(1024, 1); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + @SuppressWarnings("deprecation") + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), + new ContainerContextMatcher(), appCtx); + containers.addContainerIfNew(container, 0, 0, 0); + + doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); + doReturn(containers).when(appCtx).getAllContainers(); + + TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); + TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + taListener, taskConf, new SystemClock(), + mockHeartbeatHandler, appCtx, false, + resource, createFakeContainerContext(), false); + TezTaskAttemptID taskAttemptID = taImpl.getID(); + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + + taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); + // At state STARTING. + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, + null)); + assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), + TaskAttemptState.RUNNING); + verify(mockHeartbeatHandler).register(taskAttemptID); + + int expectedEventsAtRunning = 4; + verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); + verifyEventType( + arg.getAllValues().subList(0, + expectedEventsAtRunning), SpeculatorEventTaskAttemptStatusUpdate.class, 1); + + taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); + + taImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptID, TaskAttemptEventType.TA_FAILED, + TaskFailureType.FATAL, "0", TaskAttemptTerminationCause.APPLICATION_ERROR)); assertEquals("Task attempt is not in the FAIL_IN_PROGRESS state", taImpl.getInternalState(), @@ -776,8 +922,9 @@ public class TestTaskAttempt { Event e = verifyEventType( arg.getAllValues().subList(expectedEventsAtRunning, - expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1); - assertEquals(TaskEventType.T_ATTEMPT_FAILED, e.getType()); + expectedEvenstAfterTerminating), TaskEventTAFailed.class, 1); + TaskEventTAFailed failedEvent = (TaskEventTAFailed) e; + assertEquals(TaskFailureType.FATAL, failedEvent.getTaskFailureType()); verifyEventType( arg.getAllValues().subList(expectedEventsAtRunning, expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); @@ -868,6 +1015,7 @@ public class TestTaskAttempt { TaskAttemptEventAttemptFailed fEvent = (TaskAttemptEventAttemptFailed) arg.getValue(); assertEquals(taImpl.getID(), fEvent.getTaskAttemptID()); assertEquals(TaskAttemptTerminationCause.NO_PROGRESS, fEvent.getTerminationCause()); + assertEquals(TaskFailureType.NON_FATAL, fEvent.getTaskFailureType()); taImpl.handle(fEvent); assertEquals("Task attempt is not in the FAIL_IN_PROGRESS state", taImpl.getInternalState(), @@ -982,7 +1130,7 @@ public class TestTaskAttempt { Event e = verifyEventType( arg.getAllValues().subList(expectedEventsAtRunning, - expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1); + expectedEvenstAfterTerminating), TaskEventTASucceeded.class, 1); assertEquals(TaskEventType.T_ATTEMPT_SUCCEEDED, e.getType()); verifyEventType( arg.getAllValues().subList(expectedEventsAtRunning, @@ -1067,7 +1215,7 @@ public class TestTaskAttempt { verifyEventType( arg.getAllValues().subList(expectedEventsAtRunning, - expectedEventsAfterTerminating), TaskEventTAUpdate.class, 1); + expectedEventsAfterTerminating), TaskEventTASucceeded.class, 1); verifyEventType( arg.getAllValues().subList(expectedEventsAtRunning, expectedEventsAfterTerminating), AMSchedulerEventTAEnded.class, 1); @@ -1159,7 +1307,8 @@ public class TestTaskAttempt { verifyEventType( arg.getAllValues().subList(expectedEventsAtRunning, - expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1); + expectedEvenstAfterTerminating), TaskEventTASucceeded.class, 1); + verifyEventType( arg.getAllValues().subList(expectedEventsAtRunning, expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); @@ -1182,7 +1331,7 @@ public class TestTaskAttempt { verify(eventHandler, times(expectedEventsNodeFailure)).handle(arg.capture()); verifyEventType( arg.getAllValues().subList(expectedEvenstAfterTerminating, - expectedEventsNodeFailure), TaskEventTAUpdate.class, 1); + expectedEventsNodeFailure), TaskEventTAKilled.class, 1); // Verify still in KILLED state assertEquals("Task attempt is not in the KILLED state", TaskAttemptState.KILLED, @@ -1266,7 +1415,7 @@ public class TestTaskAttempt { verifyEventType( arg.getAllValues().subList(expectedEventsAtRunning, - expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1); + expectedEvenstAfterTerminating), TaskEventTASucceeded.class, 1); verifyEventType( arg.getAllValues().subList(expectedEventsAtRunning, expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); @@ -1392,6 +1541,7 @@ public class TestTaskAttempt { verify(mockHistHandler, times(3)).handle(histArg.capture()); histEvent = histArg.getValue(); finishEvent = (TaskAttemptFinishedEvent)histEvent.getHistoryEvent(); + assertEquals(TaskFailureType.NON_FATAL, finishEvent.getTaskFailureType()); long newFinishTime = finishEvent.getFinishTime(); Assert.assertEquals(finishTime, newFinishTime); @@ -1399,9 +1549,11 @@ public class TestTaskAttempt { int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 2; arg.getAllValues().clear(); verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle(arg.capture()); - verifyEventType( + Event e = verifyEventType( arg.getAllValues().subList(expectedEventsTillSucceeded, - expectedEventsAfterFetchFailure), TaskEventTAUpdate.class, 1); + expectedEventsAfterFetchFailure), TaskEventTAFailed.class, 1); + TaskEventTAFailed failedEvent = (TaskEventTAFailed) e; + assertEquals(TaskFailureType.NON_FATAL, failedEvent.getTaskFailureType()); taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 1)); assertEquals("Task attempt is not in FAILED state, still", @@ -1614,9 +1766,9 @@ public class TestTaskAttempt { @Override protected void logJobHistoryAttemptUnsuccesfulCompletion( - TaskAttemptState state) { + TaskAttemptState state, TaskFailureType taskFailureType) { taskAttemptFinishedEventLogged++; - super.logJobHistoryAttemptUnsuccesfulCompletion(state); + super.logJobHistoryAttemptUnsuccesfulCompletion(state, taskFailureType); } @Override
