http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java deleted file mode 100644 index f275a56..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.app.dag.event; - -import org.apache.tez.dag.api.oldrecords.TaskState; -import org.apache.tez.dag.records.TezTaskID; - -public class TaskEventRecoverTask extends TaskEvent { - - TaskState desiredState; - - boolean recoverDataForAttempts; - - public TaskEventRecoverTask(TezTaskID taskID, TaskState desiredState) { - this(taskID, desiredState, true); - } - - public TaskEventRecoverTask(TezTaskID taskID, TaskState desiredState, - boolean recoverData) { - super(taskID, TaskEventType.T_RECOVER); - this.desiredState = desiredState; - this.recoverDataForAttempts = recoverData; - } - - public TaskEventRecoverTask(TezTaskID taskID) { - this(taskID, null); - } - - public TaskState getDesiredState() { - return desiredState; - } - - public boolean recoverData() { - return recoverDataForAttempts; - } - -}
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java index 696602a..70d6043 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java @@ -22,14 +22,17 @@ import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.runtime.api.impl.TaskSpec; -public class TaskEventScheduleTask extends TaskEvent { +public class TaskEventScheduleTask extends TaskEvent implements RecoveryEvent { private final TaskSpec baseTaskSpec; private final TaskLocationHint locationHint; - - public TaskEventScheduleTask(TezTaskID taskId, TaskSpec baseTaskSpec, TaskLocationHint locationHint) { + private final boolean fromRecovery; + + public TaskEventScheduleTask(TezTaskID taskId, TaskSpec baseTaskSpec, TaskLocationHint locationHint, + boolean fromRecovery) { super(taskId, TaskEventType.T_SCHEDULE); this.baseTaskSpec = baseTaskSpec; this.locationHint = locationHint; + this.fromRecovery = fromRecovery; } public TaskSpec getBaseTaskSpec() { @@ -39,4 +42,9 @@ public class TaskEventScheduleTask extends TaskEvent { public TaskLocationHint getTaskLocationHint() { return locationHint; } + + @Override + public boolean isFromRecovery() { + return fromRecovery; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java index d48a0bf..1605869 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java @@ -22,11 +22,12 @@ import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskID; public class TaskEventTermination extends TaskEvent implements DiagnosableEvent, - TaskAttemptEventTerminationCauseEvent { + TaskAttemptEventTerminationCauseEvent, RecoveryEvent { private final String diagnostics; private final TaskAttemptTerminationCause errorCause; - + private boolean fromRecovery; + public TaskEventTermination(TezTaskID taskID, TaskAttemptTerminationCause errorCause, String diagnostics) { super(taskID, TaskEventType.T_TERMINATE); this.errorCause = errorCause; @@ -37,6 +38,12 @@ public class TaskEventTermination extends TaskEvent implements DiagnosableEvent, } } + public TaskEventTermination(TezTaskID taskID, TaskAttemptTerminationCause errorCause, String diagnostics, + boolean fromRecovery) { + this(taskID, errorCause, diagnostics); + this.fromRecovery = fromRecovery; + } + @Override public String getDiagnosticInfo() { return diagnostics; @@ -47,4 +54,9 @@ public class TaskEventTermination extends TaskEvent implements DiagnosableEvent, return errorCause; } + @Override + public boolean isFromRecovery() { + return fromRecovery; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java index baec5f0..726e13e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java @@ -38,7 +38,4 @@ public enum TaskEventType { T_ATTEMPT_SUCCEEDED, T_ATTEMPT_KILLED, - // Recovery event - T_RECOVER - } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java index 34e45fe..4203689 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java @@ -32,5 +32,4 @@ public class VertexEventRecoverVertex extends VertexEvent { public VertexState getDesiredState() { return desiredState; } - } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java index 69195db..211202d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java @@ -27,25 +27,13 @@ public class VertexEventRouteEvent extends VertexEvent { final List<TezEvent> events; - final boolean recovered; - public VertexEventRouteEvent(TezVertexID vertexId, List<TezEvent> events) { - this(vertexId, events, false); - } - - public VertexEventRouteEvent(TezVertexID vertexId, List<TezEvent> events, - boolean recovered) { super(vertexId, VertexEventType.V_ROUTE_EVENT); this.events = events; - this.recovered = recovered; } public List<TezEvent> getEvents() { return events; } - public boolean isRecovered() { - return recovered; - } - } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java deleted file mode 100644 index e3b9334..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.app.dag.event; - -import org.apache.tez.dag.app.dag.VertexState; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.dag.records.TezVertexID; - -import java.util.List; - -public class VertexEventSourceVertexRecovered extends VertexEvent { - - VertexState sourceVertexState; - TezVertexID sourceVertexID; - List<TezTaskAttemptID> completedTaskAttempts; - int sourceDistanceFromRoot; - - public VertexEventSourceVertexRecovered(TezVertexID vertexID, - TezVertexID sourceVertexID, - VertexState sourceVertexState, - List<TezTaskAttemptID> completedTaskAttempts, - int sourceDistanceFromRoot) { - super(vertexID, VertexEventType.V_SOURCE_VERTEX_RECOVERED); - this.sourceVertexState = sourceVertexState; - this.sourceVertexID = sourceVertexID; - this.completedTaskAttempts = completedTaskAttempts; - this.sourceDistanceFromRoot = sourceDistanceFromRoot; - } - - public VertexState getSourceVertexState() { - return sourceVertexState; - } - - public TezVertexID getSourceVertexID() { - return sourceVertexID; - } - - public List<TezTaskAttemptID> getCompletedTaskAttempts() { - return completedTaskAttempts; - } - - public int getSourceDistanceFromRoot() { - return sourceDistanceFromRoot; - } - -} http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java index 6ea945b..15be94d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java @@ -57,9 +57,6 @@ public enum VertexEventType { // Producer: Vertex V_READY_TO_INIT, - - // Recover Event, Producer:Vertex - V_SOURCE_VERTEX_RECOVERED, // Producer: Edge V_NULL_EDGE_INITIALIZED, http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 4dfba84..f395e62 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -31,7 +31,6 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; @@ -82,7 +81,9 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.RecoveryParser.VertexRecoveryData; import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; +import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.DAGReport; @@ -112,11 +113,11 @@ import org.apache.tez.dag.app.dag.event.VertexEventTermination; import org.apache.tez.dag.app.dag.event.VertexEventType; import org.apache.tez.common.security.ACLManager; import org.apache.tez.dag.history.DAGHistoryEvent; -import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.events.DAGCommitStartedEvent; import org.apache.tez.dag.history.events.DAGFinishedEvent; import org.apache.tez.dag.history.events.DAGInitializedEvent; import org.apache.tez.dag.history.events.DAGStartedEvent; +import org.apache.tez.dag.history.events.VertexFinishedEvent; import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent; import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent; import org.apache.tez.dag.records.TezDAGID; @@ -204,10 +205,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private final List<String> diagnostics = new ArrayList<String>(); - // Recovery related flags - boolean recoveryInitEventSeen = false; - boolean recoveryStartEventSeen = false; - private TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption; private static final DagStateChangedCallback STATE_CHANGED_CALLBACK = new DagStateChangedCallback(); @@ -237,10 +234,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, .addTransition(DAGState.NEW, DAGState.NEW, DAGEventType.DAG_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) + // either recovered to FINISHED state or recovered to NEW to rerun the dag based on the recovery data .addTransition(DAGState.NEW, - EnumSet.of(DAGState.NEW, DAGState.INITED, DAGState.RUNNING, - DAGState.SUCCEEDED, DAGState.FAILED, DAGState.KILLED, - DAGState.ERROR, DAGState.TERMINATING), + EnumSet.of(DAGState.NEW, DAGState.SUCCEEDED, + DAGState.FAILED, DAGState.KILLED, + DAGState.ERROR), DAGEventType.DAG_RECOVER, new RecoverTransition()) .addTransition(DAGState.NEW, DAGState.NEW, @@ -448,11 +446,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, Map<String, VertexGroupInfo> vertexGroups = Maps.newHashMap(); Map<String, List<VertexGroupInfo>> vertexGroupInfo = Maps.newHashMap(); - private DAGState recoveredState = DAGState.NEW; - - @VisibleForTesting - boolean recoveryCommitInProgress = false; - Map<String, Boolean> recoveredGroupCommits = new HashMap<String, Boolean>(); + private DAGRecoveryData recoveryData; static class VertexGroupInfo { String groupName; @@ -637,59 +631,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } @Override - public DAGState restoreFromEvent(HistoryEvent historyEvent) { - writeLock.lock(); - try { - switch (historyEvent.getEventType()) { - case DAG_INITIALIZED: - recoveredState = initializeDAG((DAGInitializedEvent) historyEvent); - recoveryInitEventSeen = true; - return recoveredState; - case DAG_STARTED: - if (!recoveryInitEventSeen) { - throw new RuntimeException("Started Event seen but" - + " no Init Event was encountered earlier"); - } - recoveryStartEventSeen = true; - this.startTime = ((DAGStartedEvent) historyEvent).getStartTime(); - recoveredState = DAGState.RUNNING; - return recoveredState; - case DAG_COMMIT_STARTED: - recoveryCommitInProgress = true; - return recoveredState; - case VERTEX_GROUP_COMMIT_STARTED: - VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent = - (VertexGroupCommitStartedEvent) historyEvent; - recoveredGroupCommits.put( - vertexGroupCommitStartedEvent.getVertexGroupName(), false); - return recoveredState; - case VERTEX_GROUP_COMMIT_FINISHED: - VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent = - (VertexGroupCommitFinishedEvent) historyEvent; - recoveredGroupCommits.put( - vertexGroupCommitFinishedEvent.getVertexGroupName(), true); - return recoveredState; - case DAG_KILL_REQUEST: - trySetTerminationCause(DAGTerminationCause.DAG_KILL); - this.recoveredState = DAGState.KILLED; - return recoveredState; - case DAG_FINISHED: - recoveryCommitInProgress = false; - DAGFinishedEvent finishedEvent = (DAGFinishedEvent) historyEvent; - setFinishTime(finishedEvent.getFinishTime()); - recoveredState = finishedEvent.getState(); - this.fullCounters = finishedEvent.getTezCounters(); - return recoveredState; - default: - throw new RuntimeException("Unexpected event received for restoring" - + " state, eventType=" + historyEvent.getEventType()); - } - } finally { - writeLock.unlock(); - } - } - - @Override public ACLManager getACLManager() { return this.aclManager; } @@ -1241,39 +1182,50 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, return taskStats; } - void logJobHistoryFinishedEvent() throws IOException { - this.setFinishTime(); - Map<String, Integer> taskStats = constructTaskStats(getDAGProgress()); - DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime, - finishTime, DAGState.SUCCEEDED, "", getAllCounters(), - this.userName, this.dagName, taskStats, this.appContext.getApplicationAttemptId()); - this.appContext.getHistoryHandler().handleCriticalEvent( - new DAGHistoryEvent(dagId, finishEvt)); - } - void logJobHistoryInitedEvent() { - DAGInitializedEvent initEvt = new DAGInitializedEvent(this.dagId, - this.initTime, this.userName, this.dagName, this.getVertexNameIDMapping()); - this.appContext.getHistoryHandler().handle( - new DAGHistoryEvent(dagId, initEvt)); + if (recoveryData == null + || recoveryData.getDAGInitializedEvent() == null) { + DAGInitializedEvent initEvt = new DAGInitializedEvent(this.dagId, + clock.getTime(), this.userName, this.dagName, this.getVertexNameIDMapping()); + this.appContext.getHistoryHandler().handle( + new DAGHistoryEvent(dagId, initEvt)); + } } void logJobHistoryStartedEvent() { - DAGStartedEvent startEvt = new DAGStartedEvent(this.dagId, - this.startTime, this.userName, this.dagName); - this.appContext.getHistoryHandler().handle( - new DAGHistoryEvent(dagId, startEvt)); + if (recoveryData == null + || recoveryData.getDAGStartedEvent() == null) { + DAGStartedEvent startEvt = new DAGStartedEvent(this.dagId, + clock.getTime(), this.userName, this.dagName); + this.appContext.getHistoryHandler().handle( + new DAGHistoryEvent(dagId, startEvt)); + } + } + + void logJobHistoryFinishedEvent() throws IOException { + if (recoveryData == null + || recoveryData.getDAGFinishedEvent() == null) { + Map<String, Integer> taskStats = constructTaskStats(getDAGProgress()); + DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, clock.getTime(), + finishTime, DAGState.SUCCEEDED, "", getAllCounters(), + this.userName, this.dagName, taskStats, this.appContext.getApplicationAttemptId()); + this.appContext.getHistoryHandler().handleCriticalEvent( + new DAGHistoryEvent(dagId, finishEvt)); + } } void logJobHistoryUnsuccesfulEvent(DAGState state) throws IOException { - Map<String, Integer> taskStats = constructTaskStats(getDAGProgress()); - DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime, - clock.getTime(), state, - StringUtils.join(getDiagnostics(), LINE_SEPARATOR), - getAllCounters(), this.userName, this.dagName, taskStats, - this.appContext.getApplicationAttemptId()); - this.appContext.getHistoryHandler().handleCriticalEvent( - new DAGHistoryEvent(dagId, finishEvt)); + if (recoveryData == null + || recoveryData.getDAGFinishedEvent() == null) { + Map<String, Integer> taskStats = constructTaskStats(getDAGProgress()); + DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, 0L, + clock.getTime(), state, + StringUtils.join(getDiagnostics(), LINE_SEPARATOR), + getAllCounters(), this.userName, this.dagName, taskStats, + this.appContext.getApplicationAttemptId()); + this.appContext.getHistoryHandler().handleCriticalEvent( + new DAGHistoryEvent(dagId, finishEvt)); + } } // triggered by vertex_complete @@ -1474,17 +1426,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } } - public DAGState initializeDAG() { - return initializeDAG(null); - } - - DAGState initializeDAG(DAGInitializedEvent event) { - if (event != null) { - initTime = event.getInitTime(); - } else { - initTime = clock.getTime(); - } + DAGState initializeDAG() { commitAllOutputsOnSuccess = dagConf.getBoolean( TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT); @@ -1494,9 +1437,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, if (numVertices == 0) { addDiagnostic("No vertices for dag"); trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES); - if (event != null) { - return DAGState.FAILED; - } return finished(DAGState.FAILED); } @@ -1668,139 +1608,68 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, vertex.setOutputVertices(outVertices); } + /** + * 2 cases of recovery: + * <ul> + * <li> + * 1. For the completed dag, recover the dag to the desired state and also its vertices, + * but not task & task attempt. This recovery is sync call (after this Transition, + * DAG & vertices are all recovered to the desired state) + * </li> + * <li> + * 2. For the non-completed dag, recover the dag as normal dag execution. The only difference + * is setting the recoveryData before sending DAG_INIT event so that some steps in the execution + * will be skipped based on the recoveryData + * </li> + * </ul> + */ private static class RecoverTransition implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> { @Override public DAGState transition(DAGImpl dag, DAGEvent dagEvent) { - DAGEventRecoverEvent recoverEvent = (DAGEventRecoverEvent) dagEvent; + DAGEventRecoverEvent recoverEvent = (DAGEventRecoverEvent)dagEvent; + // With desired state, represents the case that DAG is completed if (recoverEvent.hasDesiredState()) { - // DAG completed or final end state known - dag.recoveredState = recoverEvent.getDesiredState(); - } - if (recoverEvent.getAdditionalUrlsForClasspath() != null) { - LOG.info("Added additional resources : [" + recoverEvent.getAdditionalUrlsForClasspath() - + "] to classpath"); - RelocalizationUtils.addUrlsToClassPath(recoverEvent.getAdditionalUrlsForClasspath()); - } - - switch (dag.recoveredState) { - case NEW: - // send DAG an Init and start events - dag.eventHandler.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_INIT)); - dag.eventHandler.handle(new DAGEventStartDag(dag.getID(), null)); - return DAGState.NEW; - case INITED: - // DAG inited but not started - // This implies vertices need to be sent init event - // Root vertices need to be sent start event - // The vertices may already have been sent these events but the - // DAG start may not have been persisted - for (Vertex v : dag.vertices.values()) { - if (v.getInputVerticesCount() == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Sending Running Recovery event to root vertex " - + v.getLogIdentifier()); - } - dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(), - VertexState.RUNNING)); - } - } - return DAGState.RUNNING; - case RUNNING: - // if commit is in progress, DAG should fail as commits are not - // recoverable - boolean groupCommitInProgress = false; - if (!dag.recoveredGroupCommits.isEmpty()) { - for (Entry<String, Boolean> entry : dag.recoveredGroupCommits.entrySet()) { - if (!entry.getValue().booleanValue()) { - LOG.info("Found a pending Vertex Group commit" - + ", vertexGroup=" + entry.getKey()); - groupCommitInProgress = true; - break; - } - } - } - - if (groupCommitInProgress || dag.recoveryCommitInProgress) { - // Fail the DAG as we have not seen a commit completion - dag.trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE); - dag.setFinishTime(); - // Recover all other data for all vertices - // send recover event to all vertices with a final end state - for (Vertex v : dag.vertices.values()) { - VertexState desiredState = VertexState.SUCCEEDED; - if (dag.recoveredState.equals(DAGState.KILLED)) { - desiredState = VertexState.KILLED; - } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains( - dag.recoveredState)) { - desiredState = VertexState.FAILED; - } - dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(), - desiredState)); - } - DAGState endState = DAGState.FAILED; - try { - dag.logJobHistoryUnsuccesfulEvent(endState); - } catch (IOException e) { - LOG.warn("Failed to persist recovery event for DAG completion" - + ", dagId=" + dag.dagId - + ", finalState=" + endState); - } - dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(), - endState)); - return endState; - } - - for (Vertex v : dag.vertices.values()) { - if (v.getInputVerticesCount() == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Sending Running Recovery event to root vertex " - + v.getLogIdentifier()); - } - dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(), - VertexState.RUNNING)); - } - } - return DAGState.RUNNING; + VertexState vertexDesiredState = null; + switch (recoverEvent.getDesiredState()) { case SUCCEEDED: - case ERROR: + vertexDesiredState = VertexState.SUCCEEDED; + break; case FAILED: + vertexDesiredState = VertexState.FAILED; + break; case KILLED: - // Completed - - // Recover all other data for all vertices - // send recover event to all vertices with a final end state - for (Vertex v : dag.vertices.values()) { - VertexState desiredState = VertexState.SUCCEEDED; - if (dag.recoveredState.equals(DAGState.KILLED)) { - desiredState = VertexState.KILLED; - } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains( - dag.recoveredState)) { - desiredState = VertexState.FAILED; - } - dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(), - desiredState)); - } - - // Let us inform AM of completion - dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(), - dag.recoveredState)); - - LOG.info("Recovered DAG: " + dag.getID() + " finished with state: " - + dag.recoveredState); - return dag.recoveredState; + vertexDesiredState = VertexState.KILLED; + break; + case ERROR: + vertexDesiredState = VertexState.ERROR; + break; default: - // Error state - LOG.warn("Trying to recover DAG, failed to recover" - + " from non-handled state" + dag.recoveredState); - // Tell AM ERROR so that it can shutdown - dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(), - DAGState.ERROR)); - return DAGState.FAILED; + String msg = "Invalid desired state of DAG" + + ", dagName=" + dag.getName() + + ", state=" + recoverEvent.getDesiredState(); + LOG.warn(msg); + dag.addDiagnostic(msg); + return dag.finished(DAGState.ERROR); + } + // Initialize dag synchronously to generate the vertices and recover its vertices to the desired state. + dag.initializeDAG(); + for (Vertex v : dag.vertexMap.values()) { + dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(), vertexDesiredState)); + } + dag.addDiagnostic("DAG is recovered to finished state:" + recoverEvent.getDesiredState() + + ", but will only recover partial data due to incomplete recovery data"); + return dag.finished(recoverEvent.getDesiredState()); } - } + // for the cases that DAG is not completed, recover it as normal dag execution. + dag.recoveryData = recoverEvent.getRecoveredDagData(); + dag.appContext.setDAGRecoveryData(dag.recoveryData); + dag.getEventHandler().handle(new DAGEvent(dag.getID(), DAGEventType.DAG_INIT)); + dag.getEventHandler().handle(new DAGEventStartDag(dag.getID(), dag.recoveryData.additionalUrlsForClasspath)); + return DAGState.NEW; + } } private static class InitTransition @@ -1818,6 +1687,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, // TODO Metrics //dag.metrics.submittedJob(dag); //dag.metrics.preparingJob(dag); + if (dag.recoveryData != null && dag.recoveryData.getDAGInitializedEvent() != null) { + dag.initTime = dag.recoveryData.getDAGInitializedEvent().getInitTime(); + } else { + dag.initTime = dag.clock.getTime(); + } dag.startDAGCpuTime = dag.appContext.getCumulativeCPUTime(); dag.startDAGGCTime = dag.appContext.getCumulativeGCTime(); @@ -1845,9 +1719,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, */ @Override public void transition(DAGImpl dag, DAGEvent event) { + if (dag.recoveryData != null && dag.recoveryData.getDAGStartedEvent() != null) { + dag.startTime = dag.recoveryData.getDAGStartedEvent().getStartTime(); + } else { + dag.startTime = dag.clock.getTime(); + } DAGEventStartDag startEvent = (DAGEventStartDag) event; - dag.startTime = dag.clock.getTime(); - dag.logJobHistoryStartedEvent(); List<URL> additionalUrlsForClasspath = startEvent.getAdditionalUrlsForClasspath(); if (additionalUrlsForClasspath != null) { LOG.info("Added additional resources : [" + additionalUrlsForClasspath + "] to classpath"); @@ -1858,6 +1735,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, // Start all vertices with no incoming edges when job starts dag.initializeVerticesAndStart(); + dag.logJobHistoryStartedEvent(); } } @@ -2032,6 +1910,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } + private Collection<TezVertexID> getVertexIds(Collection<String> vertexNames) { + List<TezVertexID> vertexIds = new ArrayList<TezVertexID>(vertexNames.size()); + for (String name : vertexNames) { + vertexIds.add(getVertexNameIDMapping().get(name)); + } + return vertexIds; + } + private static class VertexReRunningTransition implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> { @@ -2078,17 +1964,30 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } } for (VertexGroupInfo groupInfo : commitList) { - if (recoveredGroupCommits.containsKey(groupInfo.groupName)) { + if (recoveryData != null && recoveryData.isVertexGroupCommitted(groupInfo.groupName)) { LOG.info("VertexGroup was already committed as per recovery" + " data, groupName=" + groupInfo.groupName); + for (String vertexName : groupInfo.groupMembers) { + VertexRecoveryData vertexRecoveryData = + recoveryData.getVertexRecoveryData(getVertex(vertexName).getVertexId()); + Preconditions.checkArgument(vertexRecoveryData != null,"Vertex Group has been committed" + + ", but no VertexRecoveryData found for its vertex " + vertexName); + VertexFinishedEvent vertexFinishedEvent = vertexRecoveryData.getVertexFinishedEvent(); + Preconditions.checkArgument(vertexFinishedEvent!= null,"Vertex Group has been committed" + + ", but no VertexFinishedEvent found in its vertex " + vertexName); + Preconditions.checkArgument(vertexFinishedEvent.getState() == VertexState.SUCCEEDED, + "Vertex Group has been committed, but unexpected vertex state of its vertex " + + vertexName + ", vertexstate=" + vertexFinishedEvent.getState()); + } continue; } groupInfo.commitStarted = true; final Vertex v = getVertex(groupInfo.groupMembers.iterator().next()); try { + Collection<TezVertexID> vertexIds = getVertexIds(groupInfo.groupMembers); appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(), new VertexGroupCommitStartedEvent(dagId, groupInfo.groupName, - clock.getTime()))); + vertexIds, clock.getTime()))); } catch (IOException e) { LOG.error("Failed to send commit recovery event to handler", e); recoveryFailed = true; @@ -2269,9 +2168,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, if (vertexGroup.isCommitted()) { if (!commitAllOutputsOnSuccess) { try { + Collection<TezVertexID> vertexIds = getVertexIds(vertexGroup.groupMembers); appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(), new VertexGroupCommitFinishedEvent(getID(), commitCompletedEvent.getOutputKey().getEntityName(), - clock.getTime()))); + vertexIds, clock.getTime()))); } catch (IOException e) { String diag = "Failed to send commit recovery event to handler, " + ExceptionUtils.getStackTrace(e); addDiagnostic(diag); http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index bfd1634..957abcf 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -61,6 +61,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptReport; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; +import org.apache.tez.dag.app.RecoveryParser.TaskAttemptRecoveryData; import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.Task; @@ -72,9 +73,13 @@ import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate; import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate; import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.DiagnosableEvent; +import org.apache.tez.dag.app.dag.event.RecoveryEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled; import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate; import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed; import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule; @@ -89,7 +94,6 @@ import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded; import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest; import org.apache.tez.dag.app.rm.container.AMContainer; import org.apache.tez.dag.history.DAGHistoryEvent; -import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; import org.apache.tez.dag.records.TaskAttemptTerminationCause; @@ -109,6 +113,7 @@ import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -166,6 +171,7 @@ public class TaskAttemptImpl implements TaskAttempt, private final Lock writeLock; protected final AppContext appContext; private final TaskHeartbeatHandler taskHeartbeatHandler; + private final TaskAttemptRecoveryData recoveryData; private long launchTime = 0; private long finishTime = 0; private String trackerName; @@ -191,10 +197,12 @@ public class TaskAttemptImpl implements TaskAttempt, private DAGCounter localityCounter; org.apache.tez.runtime.api.impl.TaskStatistics statistics; - + long lastNotifyProgressTimestamp = 0; private final long hungIntervalMax; + private List<TezEvent> taGeneratedEvents = Lists.newArrayList(); + // Used to store locality information when Set<String> taskHosts = new HashSet<String>(); Set<String> taskRacks = new HashSet<String>(); @@ -240,20 +248,29 @@ public class TaskAttemptImpl implements TaskAttempt, (TaskAttemptStateInternal.NEW) .addTransition(TaskAttemptStateInternal.NEW, - EnumSet.of(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAILED), + EnumSet.of(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAILED), TaskAttemptEventType.TA_SCHEDULE, new ScheduleTaskattemptTransition()) + // NEW -> FAILED due to TA_FAILED happens in recovery + // (No TaskAttemptStartedEvent, but with TaskAttemptFinishedEvent(FAILED) + .addTransition(TaskAttemptStateInternal.NEW, + TaskAttemptStateInternal.FAILED, + TaskAttemptEventType.TA_FAILED, new TerminateTransition(FAILED_HELPER)) .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL_REQUEST, new TerminateTransition(KILLED_HELPER)) - + // NEW -> KILLED due to TA_KILLED happens in recovery + // (No TaskAttemptStartedEvent, but with TaskAttemptFinishedEvent(KILLED) .addTransition(TaskAttemptStateInternal.NEW, - EnumSet.of(TaskAttemptStateInternal.NEW, - TaskAttemptStateInternal.RUNNING, - TaskAttemptStateInternal.KILLED, - TaskAttemptStateInternal.FAILED, - TaskAttemptStateInternal.SUCCEEDED), - TaskAttemptEventType.TA_RECOVER, new RecoverTransition()) + TaskAttemptStateInternal.KILLED, + TaskAttemptEventType.TA_KILLED, + new TerminateTransition(KILLED_HELPER)) + // NEW -> SUCCEEDED due to TA_DONE happens in recovery + // (with TaskAttemptStartedEvent and with TaskAttemptFinishedEvent(SUCCEEDED) + .addTransition(TaskAttemptStateInternal.NEW, + TaskAttemptStateInternal.SUCCEEDED, + TaskAttemptEventType.TA_DONE, + new SucceededTransition()) .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.RUNNING, @@ -328,6 +345,11 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptStateInternal.RUNNING), TaskAttemptEventType.TA_OUTPUT_FAILED, new OutputReportedFailedTransition()) + // for recovery, needs to log the TA generated events in TaskAttemptFinishedEvent + .addTransition(TaskAttemptStateInternal.RUNNING, + TaskAttemptStateInternal.RUNNING, + TaskAttemptEventType.TA_TEZ_EVENT_UPDATE, + new TezEventUpdaterTransition()) .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILLED, @@ -434,9 +456,6 @@ public class TaskAttemptImpl implements TaskAttempt, .installTopology(); - private TaskAttemptState recoveredState = TaskAttemptState.NEW; - private boolean recoveryStartEventSeen = false; - @SuppressWarnings("rawtypes") public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler, TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Configuration conf, Clock clock, @@ -493,6 +512,8 @@ public class TaskAttemptImpl implements TaskAttempt, TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT); + this.recoveryData = appContext.getDAGRecoveryData() == null ? + null : appContext.getDAGRecoveryData().getTaskAttemptRecoveryData(attemptId); } @Override @@ -521,14 +542,6 @@ public class TaskAttemptImpl implements TaskAttempt, TaskSpec createRemoteTaskSpec() throws AMUserCodeException { TaskSpec baseTaskSpec = task.getBaseTaskSpec(); - if (baseTaskSpec == null) { - // since recovery does not follow normal transitions, TaskEventScheduleTask - // is not being honored by the recovery code path. Using this to workaround - // until recovery is fixed. Calling the non-locking internal method of the vertex - // to get the taskSpec directly. Since everything happens on the central dispatcher - // during recovery this is deadlock free for now. TEZ-1019 should remove the need for this. - baseTaskSpec = ((VertexImpl) vertex).createRemoteTaskSpec(getID().getTaskID().getId()); - } return new TaskSpec(getID(), baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(), baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(), @@ -839,52 +852,6 @@ public class TaskAttemptImpl implements TaskAttempt, } } - @Override - public TaskAttemptState restoreFromEvent(HistoryEvent historyEvent) { - writeLock.lock(); - try { - switch (historyEvent.getEventType()) { - case TASK_ATTEMPT_STARTED: - { - TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) historyEvent; - this.launchTime = tEvent.getStartTime(); - recoveryStartEventSeen = true; - recoveredState = TaskAttemptState.RUNNING; - this.containerId = tEvent.getContainerId(); - sendEvent(createDAGCounterUpdateEventTALaunched(this)); - return recoveredState; - } - case TASK_ATTEMPT_FINISHED: - { - TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) historyEvent; - this.creationTime = tEvent.getCreationTime(); - this.allocationTime = tEvent.getAllocationTime(); - this.launchTime = tEvent.getStartTime(); - this.finishTime = tEvent.getFinishTime(); - this.creationCausalTA = tEvent.getCreationCausalTA(); - this.reportedStatus.counters = tEvent.getCounters(); - this.reportedStatus.progress = 1f; - this.reportedStatus.state = tEvent.getState(); - this.terminationCause = tEvent.getTaskAttemptError() != null ? tEvent.getTaskAttemptError() - : TaskAttemptTerminationCause.UNKNOWN_ERROR; - this.diagnostics.add(tEvent.getDiagnostics()); - this.recoveredState = tEvent.getState(); - if (tEvent.getDataEvents() != null) { - this.lastDataEvents.addAll(tEvent.getDataEvents()); - } - sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState())); - return recoveredState; - } - default: - throw new RuntimeException("Unexpected event received for restoring" - + " state, eventType=" + historyEvent.getEventType()); - - } - } finally { - writeLock.unlock(); - } - } - @SuppressWarnings("unchecked") private void sendEvent(Event<?> event) { this.eventHandler.handle(event); @@ -1055,6 +1022,7 @@ public class TaskAttemptImpl implements TaskAttempt, } protected void logJobHistoryAttemptStarted() { + Preconditions.checkArgument(recoveryData == null); final String containerIdStr = containerId.toString(); String inProgressLogsUrl = nodeHttpAddress + "/" + "node/containerlogs" @@ -1081,13 +1049,16 @@ public class TaskAttemptImpl implements TaskAttempt, } protected void logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal state) { - //Log finished events only if an attempt started. + Preconditions.checkArgument(recoveryData == null + || recoveryData.getTaskAttemptFinishedEvent() == null, + "log TaskAttemptFinishedEvent again in recovery when there's already another TaskAtttemptFinishedEvent"); if (getLaunchTime() == 0) return; TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent( attemptId, getVertex().getName(), getLaunchTime(), getFinishTime(), TaskAttemptState.SUCCEEDED, null, - "", getCounters(), lastDataEvents, creationTime, creationCausalTA, allocationTime); + "", getCounters(), lastDataEvents, taGeneratedEvents, + creationTime, creationCausalTA, allocationTime); // FIXME how do we store information regd completion events this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), finishEvt)); @@ -1095,6 +1066,9 @@ public class TaskAttemptImpl implements TaskAttempt, protected void logJobHistoryAttemptUnsuccesfulCompletion( TaskAttemptState state) { + Preconditions.checkArgument(recoveryData == null + || recoveryData.getTaskAttemptFinishedEvent() == null, + "log TaskAttemptFinishedEvent again in recovery when there's already another TaskAtttemptFinishedEvent"); long finishTime = getFinishTime(); if (finishTime <= 0) { finishTime = clock.getTime(); // comes here in case it was terminated before launch @@ -1104,8 +1078,8 @@ public class TaskAttemptImpl implements TaskAttempt, finishTime, state, terminationCause, StringUtils.join( - getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents, - creationTime, creationCausalTA, allocationTime); + getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents, + taGeneratedEvents, creationTime, creationCausalTA, allocationTime); // FIXME how do we store information regd completion events this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), finishEvt)); @@ -1116,12 +1090,69 @@ public class TaskAttemptImpl implements TaskAttempt, ////////////////////////////////////////////////////////////////////////////// protected static class ScheduleTaskattemptTransition implements - MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> { + MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> { @Override public TaskAttemptStateInternal transition(TaskAttemptImpl ta, TaskAttemptEvent event) { - TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event; + if (ta.recoveryData != null) { + TaskAttemptStartedEvent taStartedEvent = + ta.recoveryData.getTaskAttemptStartedEvent(); + if (taStartedEvent != null) { + ta.launchTime = taStartedEvent.getStartTime(); + TaskAttemptFinishedEvent taFinishedEvent = + ta.recoveryData.getTaskAttemptFinishedEvent(); + if (taFinishedEvent == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Only TaskAttemptStartedEvent but no TaskAttemptFinishedEvent, " + + "send out TaskAttemptEventAttemptKilled to move it to KILLED"); + } + ta.sendEvent(new TaskAttemptEventAttemptKilled(ta.getID(), + "Task Attempt killed in recovery due to can't recover the running task attempt", + TaskAttemptTerminationCause.TERMINATED_AT_RECOVERY, true)); + return TaskAttemptStateInternal.NEW; + } + } + // No matter whether TaskAttemptStartedEvent is seen, send corresponding event to move + // TA to the state of TaskAttemptFinishedEvent + TaskAttemptFinishedEvent taFinishedEvent = + ta.recoveryData.getTaskAttemptFinishedEvent(); + Preconditions.checkArgument(taFinishedEvent != null, "Both of TaskAttemptStartedEvent and TaskFinishedEvent is null," + + "taskAttemptId=" + ta.getID()); + switch (taFinishedEvent.getState()) { + case FAILED: + if (LOG.isDebugEnabled()) { + LOG.debug("TaskAttemptFinishedEvent is seen with state of FAILED" + + ", send TA_FAILED to itself" + + ", attemptId=" + ta.attemptId); + } + ta.sendEvent(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, + taFinishedEvent.getDiagnostics(), taFinishedEvent.getTaskAttemptError(), true)); + break; + case KILLED: + if (LOG.isDebugEnabled()) { + LOG.debug("TaskAttemptFinishedEvent is seen with state of KILLED" + + ", send TA_KILLED to itself" + + ", attemptId=" + ta.attemptId); + } + ta.sendEvent(new TaskAttemptEventAttemptKilled(ta.getID(), + taFinishedEvent.getDiagnostics(), taFinishedEvent.getTaskAttemptError(), true)); + break; + case SUCCEEDED: + if (LOG.isDebugEnabled()) { + LOG.debug("TaskAttemptFinishedEvent is seen with state of SUCCEEDED" + + ", send TA_DONE to itself" + + ", attemptId=" + ta.attemptId); + } + ta.sendEvent(new TaskAttemptEvent(ta.getID(), TaskAttemptEventType.TA_DONE)); + break; + default: + throw new TezUncheckedException("Invalid state in TaskAttemptFinishedEvent, state=" + + taFinishedEvent.getState() + ", taId=" + ta.getID()); + } + return TaskAttemptStateInternal.NEW; + } + TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event; ta.scheduledTime = ta.clock.getTime(); // TODO Creating the remote task here may not be required in case of // recovery. @@ -1212,7 +1243,14 @@ public class TaskAttemptImpl implements TaskAttempt, public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { // This transition should not be invoked directly, if a scheduler event has already been sent out. // Sub-classes should be used if a scheduler request has been sent. - ta.setFinishTime(); + if (ta.recoveryData == null || + ta.recoveryData.getTaskAttemptFinishedEvent() == null) { + ta.setFinishTime(); + ta.logJobHistoryAttemptUnsuccesfulCompletion(helper + .getTaskAttemptState()); + } else { + ta.finishTime = ta.recoveryData.getTaskAttemptFinishedEvent().getFinishTime(); + } if (event instanceof DiagnosableEvent) { ta.addDiagnosticInfo(((DiagnosableEvent) event).getDiagnosticInfo()); @@ -1225,11 +1263,16 @@ public class TaskAttemptImpl implements TaskAttempt, + ", requiredClass=TaskAttemptEventTerminationCauseEvent" + ", eventClass=" + event.getClass().getName()); } - + if (event instanceof RecoveryEvent) { + RecoveryEvent rEvent = (RecoveryEvent)event; + if (rEvent.isFromRecovery()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Faked TerminateEvent from recovery, taskAttemptId=" + ta.getID()); + } + } + } ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta, helper.getTaskAttemptState())); - ta.logJobHistoryAttemptUnsuccesfulCompletion(helper - .getTaskAttemptState()); // Send out events to the Task - indicating TaskAttemptTermination(F/K) ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, helper .getTaskEventType(), event)); @@ -1413,14 +1456,42 @@ public class TaskAttemptImpl implements TaskAttempt, } } + protected static class TezEventUpdaterTransition implements + SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { + + @Override + public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { + TaskAttemptEventTezEventUpdate tezEventUpdate = (TaskAttemptEventTezEventUpdate)event; + ta.taGeneratedEvents.addAll(tezEventUpdate.getTezEvents()); + } + } + protected static class SucceededTransition implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { @Override public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { - ta.setFinishTime(); - // Send out history event. - ta.logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED); + // If TaskAttempt is recovered to SUCCEEDED, send events generated by this TaskAttempt to vertex + // for its downstream consumers. For normal dag execution, the events are sent by TaskAttmeptListener + // for performance consideration. + if (ta.recoveryData != null && ta.recoveryData.isTaskAttemptSucceeded()) { + TaskAttemptFinishedEvent taFinishedEvent = ta.recoveryData + .getTaskAttemptFinishedEvent(); + if (LOG.isDebugEnabled()) { + LOG.debug("TaskAttempt is recovered to SUCCEEDED, attemptId=" + ta.attemptId); + } + ta.reportedStatus.counters = taFinishedEvent.getCounters(); + List<TezEvent> tezEvents = taFinishedEvent.getTAGeneratedEvents(); + if (tezEvents != null && !tezEvents.isEmpty()) { + ta.sendEvent(new VertexEventRouteEvent(ta.getVertexID(), tezEvents)); + } + ta.finishTime = taFinishedEvent.getFinishTime(); + } else { + ta.setFinishTime(); + // Send out history event. + ta.logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED); + } + ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta, TaskAttemptState.SUCCEEDED)); @@ -1520,48 +1591,6 @@ public class TaskAttemptImpl implements TaskAttempt, } - protected static class RecoverTransition implements - MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> { - - @Override - public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent taskAttemptEvent) { - TaskAttemptStateInternal endState = TaskAttemptStateInternal.FAILED; - switch(taskAttempt.recoveredState) { - case NEW: - case RUNNING: - // FIXME once running containers can be recovered, this - // should be handled differently - // TODO abort taskattempt - taskAttempt.sendEvent(new TaskEventTAUpdate(taskAttempt.attemptId, - TaskEventType.T_ATTEMPT_KILLED)); - taskAttempt.sendEvent(createDAGCounterUpdateEventTAFinished(taskAttempt, - getExternalState(TaskAttemptStateInternal.KILLED))); - taskAttempt.logJobHistoryAttemptUnsuccesfulCompletion(TaskAttemptState.KILLED); - endState = TaskAttemptStateInternal.KILLED; - break; - case SUCCEEDED: - // Do not inform Task as it already knows about completed attempts - endState = TaskAttemptStateInternal.SUCCEEDED; - break; - case FAILED: - // Do not inform Task as it already knows about completed attempts - endState = TaskAttemptStateInternal.FAILED; - break; - case KILLED: - // Do not inform Task as it already knows about completed attempts - endState = TaskAttemptStateInternal.KILLED; - break; - default: - throw new RuntimeException("Failed to recover from non-handled state" - + ", taskAttemptId=" + taskAttempt.getID() - + ", state=" + taskAttempt.recoveredState); - } - - return endState; - } - - } - protected static class TerminatedAfterSuccessTransition implements MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> { @Override http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 2f304c8..55dd518 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -57,6 +57,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; +import org.apache.tez.dag.app.RecoveryParser.TaskRecoveryData; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.app.dag.Task; @@ -68,12 +69,9 @@ import org.apache.tez.dag.app.dag.event.DAGEvent; import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate; import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate; import org.apache.tez.dag.app.dag.event.DAGEventType; -import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest; import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEvent; -import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask; import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask; import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; import org.apache.tez.dag.app.dag.event.TaskEventTermination; @@ -84,9 +82,6 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule; import org.apache.tez.dag.app.rm.container.AMContainer; import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded; import org.apache.tez.dag.history.DAGHistoryEvent; -import org.apache.tez.dag.history.HistoryEvent; -import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; -import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; import org.apache.tez.dag.history.events.TaskFinishedEvent; import org.apache.tez.dag.history.events.TaskStartedEvent; import org.apache.tez.dag.records.TaskAttemptTerminationCause; @@ -138,6 +133,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { long scheduledTime; final StateChangeNotifier stateChangeNotifier; + private final TaskRecoveryData recoveryData; + private final List<TezEvent> tezEventsForTaskAttempts = new ArrayList<TezEvent>(); static final ArrayList<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS = new ArrayList(0); @@ -150,8 +147,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { private static final SingleArcTransition<TaskImpl, TaskEvent> KILL_TRANSITION = new KillTransition(); - // Recovery related flags - boolean recoveryStartEventSeen = false; private static final TaskStateChangedCallback STATE_CHANGED_CALLBACK = new TaskStateChangedCallback(); @@ -164,20 +159,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { // define the state machine of Task // Transitions from NEW state - .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED, + // Stay in NEW in recovery when Task is killed in the previous AM + .addTransition(TaskStateInternal.NEW, + EnumSet.of(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED), TaskEventType.T_SCHEDULE, new InitialScheduleTransition()) .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED, TaskEventType.T_TERMINATE, new KillNewTransition()) - // Recover transition - .addTransition(TaskStateInternal.NEW, - EnumSet.of(TaskStateInternal.NEW, - TaskStateInternal.SCHEDULED, - TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED, - TaskStateInternal.FAILED, TaskStateInternal.KILLED), - TaskEventType.T_RECOVER, new RecoverTransition()) - // Transitions from SCHEDULED state //when the first attempt is launched, the task state is set to RUNNING .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.RUNNING, @@ -191,6 +180,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.FAILED), TaskEventType.T_ATTEMPT_FAILED, new AttemptFailedTransition()) + // Happens in recovery + .addTransition(TaskStateInternal.SCHEDULED, + EnumSet.of(TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED), + TaskEventType.T_ATTEMPT_SUCCEEDED, + new AttemptSucceededTransition()) // When current attempt fails/killed and new attempt launched then // TODO Task should go back to SCHEDULED state TEZ-495 @@ -199,7 +193,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition()) - .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED, + .addTransition(TaskStateInternal.RUNNING, + EnumSet.of(TaskStateInternal.SUCCEEDED), TaskEventType.T_ATTEMPT_SUCCEEDED, new AttemptSucceededTransition()) .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, @@ -327,7 +322,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { int failedAttempts; private final boolean leafVertex; - private TaskState recoveredState = TaskState.NEW; @Override public TaskState getState() { @@ -366,6 +360,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { this.leafVertex = leafVertex; this.taskResource = resource; this.containerContext = containerContext; + this.recoveryData = appContext.getDAGRecoveryData() == null ? + null : appContext.getDAGRecoveryData().getTaskRecoveryData(taskId); stateMachine = new StateMachineTez<TaskStateInternal, TaskEventType, TaskEvent, TaskImpl>( stateMachineFactory.make(this), this); augmentStateMachine(); @@ -545,122 +541,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { } } - private TaskAttempt createRecoveredTaskAttempt(TezTaskAttemptID tezTaskAttemptID) { - TaskAttempt taskAttempt = createAttempt(tezTaskAttemptID.getId(), null); - return taskAttempt; - } - - @Override - public TaskState restoreFromEvent(HistoryEvent historyEvent) { - writeLock.lock(); - try { - switch (historyEvent.getEventType()) { - case TASK_STARTED: - { - TaskStartedEvent tEvent = (TaskStartedEvent) historyEvent; - recoveryStartEventSeen = true; - this.scheduledTime = tEvent.getScheduledTime(); - if (this.attempts == null - || this.attempts.isEmpty()) { - this.attempts = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>(); - } - recoveredState = TaskState.SCHEDULED; - taskAttemptStatus.clear(); - return recoveredState; - } - case TASK_FINISHED: - { - TaskFinishedEvent tEvent = (TaskFinishedEvent) historyEvent; - if (!recoveryStartEventSeen - && !tEvent.getState().equals(TaskState.KILLED)) { - throw new TezUncheckedException("Finished Event seen but" - + " no Started Event was encountered earlier" - + ", taskId=" + taskId - + ", finishState=" + tEvent.getState()); - } - recoveredState = tEvent.getState(); - if (tEvent.getState() == TaskState.SUCCEEDED - && tEvent.getSuccessfulAttemptID() != null) { - successfulAttempt = tEvent.getSuccessfulAttemptID(); - } - return recoveredState; - } - case TASK_ATTEMPT_STARTED: - { - TaskAttemptStartedEvent taskAttemptStartedEvent = - (TaskAttemptStartedEvent) historyEvent; - TaskAttempt recoveredAttempt = createRecoveredTaskAttempt( - taskAttemptStartedEvent.getTaskAttemptID()); - recoveredAttempt.restoreFromEvent(taskAttemptStartedEvent); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding restored attempt into known attempts map" - + ", taskAttemptId=" + taskAttemptStartedEvent.getTaskAttemptID()); - } - Preconditions.checkArgument(this.attempts.put(taskAttemptStartedEvent.getTaskAttemptID(), - recoveredAttempt) == null, taskAttemptStartedEvent.getTaskAttemptID() + " already existed."); - this.taskAttemptStatus.put(taskAttemptStartedEvent.getTaskAttemptID().getId(), false); - this.recoveredState = TaskState.RUNNING; - return recoveredState; - } - case TASK_ATTEMPT_FINISHED: - { - TaskAttemptFinishedEvent taskAttemptFinishedEvent = - (TaskAttemptFinishedEvent) historyEvent; - TaskAttempt taskAttempt = this.attempts.get( - taskAttemptFinishedEvent.getTaskAttemptID()); - this.taskAttemptStatus.put(taskAttemptFinishedEvent.getTaskAttemptID().getId(), true); - if (taskAttempt == null) { - LOG.warn("Received an attempt finished event for an attempt that " - + " never started or does not exist" - + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID() - + ", taskAttemptFinishState=" + taskAttemptFinishedEvent.getState()); - TaskAttempt recoveredAttempt = createRecoveredTaskAttempt( - taskAttemptFinishedEvent.getTaskAttemptID()); - this.attempts.put(taskAttemptFinishedEvent.getTaskAttemptID(), - recoveredAttempt); - // Allow TaskAttemptFinishedEvent without TaskAttemptStartedEvent when it is KILLED/FAILED - if (!taskAttemptFinishedEvent.getState().equals(TaskAttemptState.KILLED) - && !taskAttemptFinishedEvent.getState().equals(TaskAttemptState.FAILED)) { - throw new TezUncheckedException("Could not find task attempt" - + " when trying to recover" - + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID() - + ", taskAttemptFinishState" + taskAttemptFinishedEvent.getState()); - } - taskAttempt = recoveredAttempt; - } - if (getUncompletedAttemptsCount() < 0) { - throw new TezUncheckedException("Invalid recovery event for attempt finished" - + ", more completions than starts encountered" - + ", taskId=" + taskId - + ", finishedAttempts=" + getFinishedAttemptsCount() - + ", incompleteAttempts=" + getUncompletedAttemptsCount()); - } - TaskAttemptState taskAttemptState = taskAttempt.restoreFromEvent( - taskAttemptFinishedEvent); - if (taskAttemptState.equals(TaskAttemptState.SUCCEEDED)) { - recoveredState = TaskState.SUCCEEDED; - successfulAttempt = taskAttempt.getID(); - } else if (taskAttemptState.equals(TaskAttemptState.FAILED)){ - failedAttempts++; - getVertex().incrementFailedTaskAttemptCount(); - successfulAttempt = null; - recoveredState = TaskState.RUNNING; // reset to RUNNING, may fail after SUCCEEDED - } else if (taskAttemptState.equals(TaskAttemptState.KILLED)) { - successfulAttempt = null; - getVertex().incrementKilledTaskAttemptCount(); - recoveredState = TaskState.RUNNING; // reset to RUNNING, may been killed after SUCCEEDED - } - return recoveredState; - } - default: - throw new RuntimeException("Unexpected event received for restoring" - + " state, eventType=" + historyEvent.getEventType()); - } - } finally { - writeLock.unlock(); - } - } - @VisibleForTesting public TaskStateInternal getInternalState() { readLock.lock(); @@ -1046,17 +926,39 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { } private static class InitialScheduleTransition - implements SingleArcTransition<TaskImpl, TaskEvent> { + implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> { @Override - public void transition(TaskImpl task, TaskEvent event) { + public TaskStateInternal transition(TaskImpl task, TaskEvent event) { + if (task.recoveryData != null) { + TaskStartedEvent tStartedEvent = task.recoveryData.getTaskStartedEvent(); + TaskFinishedEvent tFinishedEvent = task.recoveryData.getTaskFinishedEvent(); + // If TaskStartedEvent is not seen but TaskFinishedEvent is seen, that means + // Task is killed before it is started. Just send T_TERMINATE to itself to move to KILLED + if (tStartedEvent == null + && tFinishedEvent != null) { + Preconditions.checkArgument(tFinishedEvent.getState() == TaskState.KILLED, + "TaskStartedEvent is not seen, but TaskFinishedEvent is seen and with invalid state=" + + tFinishedEvent.getState() + ", taskId=" + task.getTaskId()); + // TODO (TEZ-2938) + // use tFinishedEvent.getTerminationCause after adding TaskTerminationCause to TaskFinishedEvent + task.eventHandler.handle(new TaskEventTermination(task.taskId, + TaskAttemptTerminationCause.UNKNOWN_ERROR, tFinishedEvent.getDiagnostics(), true)); + return TaskStateInternal.NEW; + } + } else { + task.scheduledTime = task.clock.getTime(); + task.logJobHistoryTaskStartedEvent(); + } + // No matter whether it is in recovery or normal execution, always schedule new task attempt. + // TaskAttempt will continue the recovery if necessary and send task attempt status + // to this Task. TaskEventScheduleTask scheduleEvent = (TaskEventScheduleTask) event; task.locationHint = scheduleEvent.getTaskLocationHint(); task.baseTaskSpec = scheduleEvent.getBaseTaskSpec(); // For now, initial scheduling dependency is due to vertex manager scheduling task.addAndScheduleAttempt(null); - task.scheduledTime = task.clock.getTime(); - task.logJobHistoryTaskStartedEvent(); + return TaskStateInternal.SCHEDULED; } } @@ -1085,10 +987,67 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { private static class AttemptSucceededTransition - implements SingleArcTransition<TaskImpl, TaskEvent> { + implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> { + + private boolean recoverSuccessTaskAttempt(TaskImpl task) { + // Found successful attempt + // Recover data + boolean recoveredData = true; + if (task.getVertex().getOutputCommitters() != null + && !task.getVertex().getOutputCommitters().isEmpty()) { + for (Entry<String, OutputCommitter> entry + : task.getVertex().getOutputCommitters().entrySet()) { + LOG.info("Recovering data for task from previous DAG attempt" + + ", taskId=" + task.getTaskId() + + ", output=" + entry.getKey()); + OutputCommitter committer = entry.getValue(); + if (!committer.isTaskRecoverySupported()) { + LOG.info("Task recovery not supported by committer" + + ", failing task attempt" + + ", taskId=" + task.getTaskId() + + ", attemptId=" + task.successfulAttempt + + ", output=" + entry.getKey()); + recoveredData = false; + break; + } + try { + committer.recoverTask(task.getTaskId().getId(), + task.appContext.getApplicationAttemptId().getAttemptId()-1); + } catch (Exception e) { + LOG.warn("Task recovery failed by committer" + + ", taskId=" + task.getTaskId() + + ", attemptId=" + task.successfulAttempt + + ", output=" + entry.getKey(), e); + recoveredData = false; + break; + } + } + } + return recoveredData; + } + @Override - public void transition(TaskImpl task, TaskEvent event) { + public TaskStateInternal transition(TaskImpl task, TaskEvent event) { TezTaskAttemptID successTaId = ((TaskEventTAUpdate) event).getTaskAttemptID(); + // Try to recover the succeeded TaskAttempt. It may be not recoverable if has committer which don't support + // recovery. In that case just reschedule new attempt if numFailedAttempts does not exceeded maxFailedAttempts. + if (task.recoveryData!= null + && task.recoveryData.isTaskAttemptSucceeded(successTaId)) { + boolean recoveredData = recoverSuccessTaskAttempt(task); + if (!recoveredData) { + // Move this TA to KILLED (TEZ-2958) + LOG.info("Can not recovery the successful task attempt, schedule new task attempt," + + "taskId=" + task.getTaskId()); + task.successfulAttempt = null; + task.addAndScheduleAttempt(successTaId); + return TaskStateInternal.RUNNING; + } else { + task.successfulAttempt = successTaId; + LOG.info("Recovered a successful attempt" + + ", taskAttemptId=" + task.successfulAttempt.toString()); + } + } + // both recovery to succeeded and normal dag succeeded go here. if (task.commitAttempt != null && !task.commitAttempt.equals(successTaId)) { // The succeeded attempt is not the one that was selected to commit @@ -1136,7 +1095,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { task.eventHandler.handle(new DAGEventSchedulerUpdate( DAGEventSchedulerUpdate.UpdateType.TA_SUCCEEDED, task.attempts .get(task.successfulAttempt))); - task.finished(TaskStateInternal.SUCCEEDED); + return task.finished(TaskStateInternal.SUCCEEDED); } } @@ -1162,139 +1121,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { } } - private static class RecoverTransition implements - MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> { - - @Override - public TaskStateInternal transition(TaskImpl task, TaskEvent taskEvent) { - if (taskEvent instanceof TaskEventRecoverTask) { - TaskEventRecoverTask taskEventRecoverTask = - (TaskEventRecoverTask) taskEvent; - if (taskEventRecoverTask.getDesiredState() != null - && !taskEventRecoverTask.recoverData()) { - // TODO recover attempts if desired state is given? - // History may not have all data. - switch (taskEventRecoverTask.getDesiredState()) { - case SUCCEEDED: - return TaskStateInternal.SUCCEEDED; - case FAILED: - return TaskStateInternal.FAILED; - case KILLED: - return TaskStateInternal.KILLED; - } - } - } - - TaskStateInternal endState = TaskStateInternal.NEW; - if (task.attempts != null) { - for (TaskAttempt taskAttempt : task.attempts.values()) { - task.eventHandler.handle(new TaskAttemptEvent( - taskAttempt.getID(), TaskAttemptEventType.TA_RECOVER)); - } - } - LOG.info("Trying to recover task" - + ", taskId=" + task.getTaskId() - + ", recoveredState=" + task.recoveredState); - switch(task.recoveredState) { - case NEW: - // Nothing to do until the vertex schedules this task - endState = TaskStateInternal.NEW; - break; - case SCHEDULED: - case RUNNING: - case SUCCEEDED: - if (task.successfulAttempt != null) { - //Found successful attempt - //Recover data - boolean recoveredData = true; - if (task.getVertex().getOutputCommitters() != null - && !task.getVertex().getOutputCommitters().isEmpty()) { - for (Entry<String, OutputCommitter> entry - : task.getVertex().getOutputCommitters().entrySet()) { - LOG.info("Recovering data for task from previous DAG attempt" - + ", taskId=" + task.getTaskId() - + ", output=" + entry.getKey()); - OutputCommitter committer = entry.getValue(); - if (!committer.isTaskRecoverySupported()) { - LOG.info("Task recovery not supported by committer" - + ", failing task attempt" - + ", taskId=" + task.getTaskId() - + ", attemptId=" + task.successfulAttempt - + ", output=" + entry.getKey()); - recoveredData = false; - break; - } - try { - committer.recoverTask(task.getTaskId().getId(), - task.appContext.getApplicationAttemptId().getAttemptId()-1); - } catch (Exception e) { - LOG.warn("Task recovery failed by committer" - + ", taskId=" + task.getTaskId() - + ", attemptId=" + task.successfulAttempt - + ", output=" + entry.getKey(), e); - recoveredData = false; - break; - } - } - } - if (!recoveredData) { - task.successfulAttempt = null; - } else { - LOG.info("Recovered a successful attempt" - + ", taskAttemptId=" + task.successfulAttempt.toString()); - task.logJobHistoryTaskFinishedEvent(); - task.eventHandler.handle( - new VertexEventTaskCompleted(task.taskId, - getExternalState(TaskStateInternal.SUCCEEDED))); - task.eventHandler.handle( - new VertexEventTaskAttemptCompleted( - task.successfulAttempt, TaskAttemptStateInternal.SUCCEEDED)); - endState = TaskStateInternal.SUCCEEDED; - break; - } - } - - if (endState != TaskStateInternal.SUCCEEDED && - task.failedAttempts >= task.maxFailedAttempts) { - // Exceeded max attempts - task.finished(TaskStateInternal.FAILED); - endState = TaskStateInternal.FAILED; - break; - } - - // no successful attempt and all attempts completed - // schedule a new one - // If any incomplete, the running attempt will moved to failed and its - // update will trigger a new attempt if possible - if (task.attempts.size() == task.getFinishedAttemptsCount()) { - task.addAndScheduleAttempt(null); - } - endState = TaskStateInternal.RUNNING; - break; - case KILLED: - // Nothing to do - // Inform vertex - task.eventHandler.handle( - new VertexEventTaskCompleted(task.taskId, - getExternalState(TaskStateInternal.KILLED))); - endState = TaskStateInternal.KILLED; - break; - case FAILED: - // Nothing to do - // Inform vertex - task.eventHandler.handle( - new VertexEventTaskCompleted(task.taskId, - getExternalState(TaskStateInternal.FAILED))); - - endState = TaskStateInternal.FAILED; - break; - } - - return endState; - } - } - - private static class KillWaitAttemptCompletedTransition implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> { @@ -1486,7 +1312,13 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { public void transition(TaskImpl task, TaskEvent event) { TaskEventTermination terminateEvent = (TaskEventTermination)event; task.addDiagnosticInfo(terminateEvent.getDiagnosticInfo()); - task.logJobHistoryTaskFailedEvent(TaskState.KILLED); + if (terminateEvent.isFromRecovery()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Recovered to KILLED, taskId=" + task.getTaskId()); + } + } else { + task.logJobHistoryTaskFailedEvent(TaskState.KILLED); + } task.eventHandler.handle( new VertexEventTaskCompleted(task.taskId, TaskState.KILLED)); // TODO Metrics
