http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 5b75179..758c637 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -17,7 +17,10 @@ package org.apache.tez.dag.app.dag.impl; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.BitSet; @@ -41,8 +44,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.annotation.Nullable; -import com.google.common.base.Strings; - import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -64,8 +65,8 @@ import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; -import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.OutputCommitterDescriptor; @@ -73,12 +74,15 @@ import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.RootInputLeafOutput; import org.apache.tez.dag.api.Scope; +import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexLocationHint; -import org.apache.tez.dag.api.TaskLocationHint; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; import org.apache.tez.dag.api.client.ProgressBuilder; @@ -94,6 +98,7 @@ import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; +import org.apache.tez.dag.app.RecoveryParser.VertexRecoveryData; import org.apache.tez.dag.app.TaskAttemptEventInfo; import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TaskHeartbeatHandler; @@ -113,10 +118,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted; import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning; import org.apache.tez.dag.app.dag.event.SpeculatorEvent; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEvent; -import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask; import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask; import org.apache.tez.dag.app.dag.event.TaskEventTermination; import org.apache.tez.dag.app.dag.event.TaskEventType; @@ -130,24 +132,20 @@ import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed; import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized; import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted; -import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexRecovered; import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted; import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted; import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted; import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule; import org.apache.tez.dag.app.dag.event.VertexEventTermination; import org.apache.tez.dag.app.dag.event.VertexEventType; -import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo; import org.apache.tez.dag.app.dag.impl.Edge.PendingEventRouteMetadata; import org.apache.tez.dag.app.dag.speculation.legacy.LegacySpeculator; import org.apache.tez.dag.history.DAGHistoryEvent; -import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.events.VertexCommitStartedEvent; -import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent; import org.apache.tez.dag.history.events.VertexFinishedEvent; import org.apache.tez.dag.history.events.VertexInitializedEvent; -import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; +import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent; import org.apache.tez.dag.history.events.VertexStartedEvent; import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; @@ -158,19 +156,19 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.InputSpecUpdate; import org.apache.tez.runtime.api.InputStatistics; import org.apache.tez.runtime.api.OutputCommitter; import org.apache.tez.runtime.api.OutputCommitterContext; -import org.apache.tez.runtime.api.InputSpecUpdate; import org.apache.tez.runtime.api.OutputStatistics; import org.apache.tez.runtime.api.TaskAttemptIdentifier; import org.apache.tez.runtime.api.VertexStatistics; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.events.DataMovementEvent; -import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.api.events.InputDataInformationEvent; +import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.api.events.InputInitializerEvent; -import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.EventType; @@ -180,9 +178,14 @@ import org.apache.tez.runtime.api.impl.OutputSpec; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TaskStatistics; import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.state.OnStateChangedCallback; +import org.apache.tez.state.StateMachineTez; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.HashMultiset; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -192,11 +195,6 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.tez.state.OnStateChangedCallback; -import org.apache.tez.state.StateMachineTez; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** Implementation of Vertex interface. Maintains the state machines of Vertex. * The read and write calls use ReadWriteLock for concurrency. */ @@ -222,6 +220,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl //private final MRAppMetrics metrics; private final AppContext appContext; private final DAG dag; + private final VertexRecoveryData recoveryData; + private List<TezEvent> initGeneratedEvents = new ArrayList<TezEvent>(); + // set it to be true when setParallelism is called(used for recovery) + private boolean setParallelismCalledFlag = false; private boolean lazyTasksCopyNeeded = false; // must be a linked map for ordering @@ -281,11 +283,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl private static final VertexStateChangedCallback STATE_CHANGED_CALLBACK = new VertexStateChangedCallback(); - private VertexState recoveredState = VertexState.NEW; - @VisibleForTesting - List<TezEvent> recoveredEvents = new ArrayList<TezEvent>(); - private boolean vertexAlreadyInitialized = false; - @VisibleForTesting final List<TezEvent> pendingInitializerEvents = new LinkedList<TezEvent>(); @@ -307,7 +304,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl .addTransition (VertexState.NEW, EnumSet.of(VertexState.NEW, VertexState.INITED, - VertexState.INITIALIZING, VertexState.FAILED), + VertexState.INITIALIZING, VertexState.FAILED, VertexState.KILLED), VertexEventType.V_INIT, new InitTransition()) .addTransition(VertexState.NEW, @@ -324,21 +321,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) .addTransition (VertexState.NEW, - EnumSet.of(VertexState.NEW, VertexState.INITED, - VertexState.INITIALIZING, VertexState.RUNNING, + EnumSet.of(VertexState.NEW, VertexState.SUCCEEDED, VertexState.FAILED, - VertexState.KILLED, VertexState.ERROR, - VertexState.RECOVERING), + VertexState.KILLED, VertexState.ERROR), VertexEventType.V_RECOVER, - new StartRecoverTransition()) - .addTransition - (VertexState.NEW, - EnumSet.of(VertexState.NEW, VertexState.INITED, - VertexState.INITIALIZING, VertexState.RUNNING, - VertexState.SUCCEEDED, VertexState.FAILED, - VertexState.KILLED, VertexState.ERROR, - VertexState.RECOVERING), - VertexEventType.V_SOURCE_VERTEX_RECOVERED, new RecoverTransition()) .addTransition(VertexState.NEW, VertexState.NEW, VertexEventType.V_SOURCE_VERTEX_STARTED, @@ -349,31 +335,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl .addTransition(VertexState.NEW, VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - .addTransition - (VertexState.RECOVERING, - EnumSet.of(VertexState.NEW, VertexState.INITED, - VertexState.INITIALIZING, VertexState.RUNNING, - VertexState.SUCCEEDED, VertexState.FAILED, - VertexState.KILLED, VertexState.ERROR, - VertexState.RECOVERING), - VertexEventType.V_SOURCE_VERTEX_RECOVERED, - new RecoverTransition()) - .addTransition - (VertexState.RECOVERING, VertexState.RECOVERING, - EnumSet.of(VertexEventType.V_INIT, - VertexEventType.V_ROUTE_EVENT, - VertexEventType.V_SOURCE_VERTEX_STARTED, - VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED), - new BufferDataRecoverTransition()) - .addTransition - (VertexState.RECOVERING, VertexState.RECOVERING, - VertexEventType.V_TERMINATE, - new TerminateDuringRecoverTransition()) - .addTransition - (VertexState.RECOVERING, EnumSet.of(VertexState.RECOVERING), - VertexEventType.V_MANAGER_USER_CODE_ERROR, - new VertexManagerUserCodeErrorTransition()) - + // Transitions from INITIALIZING state .addTransition(VertexState.INITIALIZING, EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, @@ -429,11 +391,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl EnumSet.of(VertexState.FAILED), VertexEventType.V_ROOT_INPUT_FAILED, new RootInputInitFailedTransition()) - .addTransition - (VertexState.INITED, - EnumSet.of(VertexState.INITED, VertexState.ERROR), - VertexEventType.V_INIT, - new IgnoreInitInInitedTransition()) .addTransition(VertexState.INITED, VertexState.INITED, VertexEventType.V_SOURCE_VERTEX_STARTED, new SourceVertexStartedTransition()) @@ -618,8 +575,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl VertexEventType.V_ROOT_INPUT_INITIALIZED, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, VertexEventType.V_NULL_EDGE_INITIALIZED, - VertexEventType.V_INPUT_DATA_INFORMATION, - VertexEventType.V_SOURCE_VERTEX_RECOVERED)) + VertexEventType.V_INPUT_DATA_INFORMATION)) // Transitions from KILLED state .addTransition( @@ -641,8 +597,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl VertexEventType.V_TASK_COMPLETED, VertexEventType.V_ROOT_INPUT_INITIALIZED, VertexEventType.V_NULL_EDGE_INITIALIZED, - VertexEventType.V_INPUT_DATA_INFORMATION, - VertexEventType.V_SOURCE_VERTEX_RECOVERED)) + VertexEventType.V_INPUT_DATA_INFORMATION)) // No transitions from INTERNAL_ERROR state. Ignore all. .addTransition( @@ -662,8 +617,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl VertexEventType.V_INTERNAL_ERROR, VertexEventType.V_ROOT_INPUT_INITIALIZED, VertexEventType.V_NULL_EDGE_INITIALIZED, - VertexEventType.V_INPUT_DATA_INFORMATION, - VertexEventType.V_SOURCE_VERTEX_RECOVERED)) + VertexEventType.V_INPUT_DATA_INFORMATION)) // create the topology tables .installTopology(); @@ -676,7 +630,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl .registerStateEnteredCallback(VertexState.KILLED, STATE_CHANGED_CALLBACK) .registerStateEnteredCallback(VertexState.RUNNING, - STATE_CHANGED_CALLBACK); + STATE_CHANGED_CALLBACK) + .registerStateEnteredCallback(VertexState.INITIALIZING, + STATE_CHANGED_CALLBACK);; } private final StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl> stateMachine; @@ -785,18 +741,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl private VertexTerminationCause terminationCause; private String logIdentifier; - @VisibleForTesting - boolean recoveryCommitInProgress = false; - private boolean summaryCompleteSeen = false; - @VisibleForTesting - boolean hasCommitter = false; - private boolean vertexCompleteSeen = false; - private Map<String,EdgeProperty> recoveredSourceEdgeProperties = null; - private Map<String, InputSpecUpdate> recoveredRootInputSpecUpdates = null; - - // Recovery related flags - boolean recoveryInitEventSeen = false; - boolean recoveryStartEventSeen = false; + private VertexStats vertexStats = null; private final TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOpts; @@ -945,7 +890,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl .createEnvironmentMapFromDAGPlan(vertexPlan.getTaskConfig() .getEnvironmentSettingList()); this.taskSpecificLaunchCmdOpts = taskSpecificLaunchCmdOption; - + this.recoveryData = appContext.getDAGRecoveryData() == null ? + null : appContext.getDAGRecoveryData().getVertexRecoveryData(vertexId); // Set up log properties, including task specific log properties. String javaOptsWithoutLoggerMods = vertexPlan.getTaskConfig().hasJavaOpts() ? vertexPlan.getTaskConfig().getJavaOpts() : null; @@ -1440,110 +1386,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl return this.appContext; } - private void handleParallelismUpdate(int newParallelism, - Map<String, EdgeProperty> sourceEdgeProperties, - Map<String, InputSpecUpdate> rootInputSpecUpdates, int oldParallelism) { - // initial parallelism must have been set by this time - // parallelism update is recorded in history only for change from an initialized value - Preconditions.checkArgument(oldParallelism != -1, getLogIdentifier()); - if (oldParallelism < newParallelism) { - addTasks(newParallelism); - } else if (oldParallelism > newParallelism) { - removeTasks(newParallelism); - } - Preconditions.checkState(this.numTasks == newParallelism, getLogIdentifier()); - this.recoveredSourceEdgeProperties = sourceEdgeProperties; - this.recoveredRootInputSpecUpdates = rootInputSpecUpdates; - } - - @Override - public VertexState restoreFromEvent(HistoryEvent historyEvent) { - writeLock.lock(); - try { - switch (historyEvent.getEventType()) { - case VERTEX_INITIALIZED: - recoveryInitEventSeen = true; - recoveredState = setupVertex((VertexInitializedEvent) historyEvent); - createTasks(); - if (LOG.isDebugEnabled()) { - LOG.debug("Recovered state for vertex after Init event" - + ", vertex=" + logIdentifier - + ", recoveredState=" + recoveredState); - } - return recoveredState; - case VERTEX_STARTED: - if (!recoveryInitEventSeen) { - throw new RuntimeException("Started Event seen but" - + " no Init Event was encountered earlier"); - } - recoveryStartEventSeen = true; - VertexStartedEvent startedEvent = (VertexStartedEvent) historyEvent; - startTimeRequested = startedEvent.getStartRequestedTime(); - startedTime = startedEvent.getStartTime(); - recoveredState = VertexState.RUNNING; - if (LOG.isDebugEnabled()) { - LOG.debug("Recovered state for vertex after Started event" - + ", vertex=" + logIdentifier - + ", recoveredState=" + recoveredState); - } - return recoveredState; - case VERTEX_PARALLELISM_UPDATED: - // TODO TEZ-1019 this should flow through setParallelism method - VertexParallelismUpdatedEvent updatedEvent = - (VertexParallelismUpdatedEvent) historyEvent; - int oldNumTasks = numTasks; - int newNumTasks = updatedEvent.getNumTasks(); - handleParallelismUpdate(newNumTasks, updatedEvent.getSourceEdgeProperties(), - updatedEvent.getRootInputSpecUpdates(), oldNumTasks); - Preconditions.checkState(this.numTasks == newNumTasks, getLogIdentifier()); - if (updatedEvent.getVertexLocationHint() != null) { - setVertexLocationHint(updatedEvent.getVertexLocationHint()); - } - stateChangeNotifier.stateChanged(vertexId, - new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks)); - if (LOG.isDebugEnabled()) { - LOG.debug("Recovered state for vertex after parallelism updated event" - + ", vertex=" + logIdentifier - + ", recoveredState=" + recoveredState); - } - return recoveredState; - case VERTEX_COMMIT_STARTED: - recoveryCommitInProgress = true; - hasCommitter = true; - return recoveredState; - case VERTEX_FINISHED: - VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent; - if (finishedEvent.isFromSummary()) { - summaryCompleteSeen = true; - } else { - vertexCompleteSeen = true; - } - numTasks = finishedEvent.getNumTasks(); - recoveryCommitInProgress = false; - recoveredState = finishedEvent.getState(); - diagnostics.add(finishedEvent.getDiagnostics()); - finishTime = finishedEvent.getFinishTime(); - // TODO counters ?? - if (LOG.isDebugEnabled()) { - LOG.debug("Recovered state for vertex after finished event" - + ", vertex=" + logIdentifier - + ", recoveredState=" + recoveredState); - } - return recoveredState; - case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: - VertexRecoverableEventsGeneratedEvent vEvent = - (VertexRecoverableEventsGeneratedEvent) historyEvent; - this.recoveredEvents.addAll(vEvent.getTezEvents()); - return recoveredState; - default: - throw new RuntimeException("Unexpected event received for restoring" - + " state, eventType=" + historyEvent.getEventType()); - } - } finally { - writeLock.unlock(); - } - } - @Override public String getLogIdentifier() { return this.logIdentifier; @@ -1613,7 +1455,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl if (!pendingTaskEvents.isEmpty()) { LOG.info("Routing pending task events for vertex: " + logIdentifier); try { - handleRoutedTezEvents(pendingTaskEvents, false, true); + handleRoutedTezEvents(pendingTaskEvents, true); } catch (AMUserCodeException e) { String msg = "Exception in " + e.getSource() + ", vertex=" + logIdentifier; LOG.error(msg, e); @@ -1667,8 +1509,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl for (ScheduleTaskRequest task : tasksToSchedule) { TezTaskID taskId = TezTaskID.getInstance(vertexId, task.getTaskIndex()); TaskSpec baseTaskSpec = createRemoteTaskSpec(taskId.getId()); + boolean fromRecovery = recoveryData == null ? false : recoveryData.getTaskRecoveryData(taskId) != null; eventHandler.handle(new TaskEventScheduleTask(taskId, baseTaskSpec, - getTaskLocationHint(taskId))); + getTaskLocationHint(taskId), fromRecovery)); } } finally { readLock.unlock(); @@ -1687,22 +1530,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl public void reconfigureVertex(int parallelism, @Nullable VertexLocationHint locationHint, @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws AMUserCodeException { - setParallelism(parallelism, locationHint, sourceEdgeProperties, null, false, true); + setParallelismWrapper(parallelism, locationHint, sourceEdgeProperties, null, true); } @Override public void reconfigureVertex(@Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate, int parallelism, @Nullable VertexLocationHint locationHint) throws AMUserCodeException { - setParallelism(parallelism, locationHint, null, rootInputSpecUpdate, false, true); + setParallelism(parallelism, locationHint, null, rootInputSpecUpdate, true); } - + @Override public void reconfigureVertex(int parallelism, @Nullable VertexLocationHint locationHint, @Nullable Map<String, EdgeProperty> sourceEdgeProperties, @Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate) throws AMUserCodeException { - setParallelism(parallelism, locationHint, sourceEdgeProperties, rootInputSpecUpdate, false, true); + setParallelismWrapper(parallelism, locationHint, sourceEdgeProperties, rootInputSpecUpdate, true); } @Override @@ -1730,49 +1573,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } finally { readLock.unlock(); } - setParallelism(parallelism, vertexLocationHint, sourceEdgeProperties, rootInputSpecUpdates, - false, fromVertexManager); + setParallelismWrapper(parallelism, vertexLocationHint, sourceEdgeProperties, rootInputSpecUpdates, + fromVertexManager); } - private void setParallelism(int parallelism, VertexLocationHint vertexLocationHint, + private void setParallelismWrapper(int parallelism, VertexLocationHint vertexLocationHint, Map<String, EdgeProperty> sourceEdgeProperties, Map<String, InputSpecUpdate> rootInputSpecUpdates, - boolean recovering, boolean fromVertexManager) throws AMUserCodeException { - if (recovering) { - writeLock.lock(); - try { - if (sourceEdgeProperties != null) { - for(Map.Entry<String, EdgeProperty> entry : - sourceEdgeProperties.entrySet()) { - LOG.info("Recovering edge manager for source:" - + entry.getKey() + " destination: " + getLogIdentifier()); - Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey()); - Edge edge = sourceVertices.get(sourceVertex); - try { - edge.setEdgeProperty(entry.getValue()); - } catch (Exception e) { - throw new TezUncheckedException("Fail to setCustomEdgeManage for Edge," - + "sourceVertex:" + edge.getSourceVertexName() - + "destinationVertex:" + edge.getDestinationVertexName(), e); - } - } - } - - // Restore any rootInputSpecUpdates which may have been registered during a parallelism - // update. - if (rootInputSpecUpdates != null) { - LOG.info("Got updated RootInputsSpecs during recovery: " + rootInputSpecUpdates.toString()); - this.rootInputSpecs.putAll(rootInputSpecUpdates); - } - return; - } finally { - writeLock.unlock(); - } - } + boolean fromVertexManager) throws AMUserCodeException { Preconditions.checkArgument(parallelism >= 0, "Parallelism must be >=0. Value: " + parallelism + " for vertex: " + logIdentifier); writeLock.lock(); - + this.setParallelismCalledFlag = true; try { // disallow changing things after a vertex has started if (!tasksNotYetScheduled) { @@ -1791,7 +1603,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl vertexToBeReconfiguredByManager, "Vertex is fully configured but still" + " the reconfiguration API has been called. VertexManager must notify the framework using " - + " context.vertexReconfigurationPlanned() before re-configuring the vertex."); + + " context.vertexReconfigurationPlanned() before re-configuring the vertex." + + " vertexId=" + logIdentifier); } // Input initializer/Vertex Manager/1-1 split expected to set parallelism. @@ -1875,7 +1688,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl removeTasks(parallelism); } } - Preconditions.checkState(this.numTasks == parallelism, getLogIdentifier()); // set new vertex location hints @@ -1903,13 +1715,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } } - // update history - VertexParallelismUpdatedEvent parallelismUpdatedEvent = new VertexParallelismUpdatedEvent( - vertexId, numTasks, vertexLocationHint, sourceEdgeProperties, rootInputSpecUpdates, - oldNumTasks); - appContext.getHistoryHandler().handle( - new DAGHistoryEvent(getDAGId(), parallelismUpdatedEvent)); - // stop buffering events for (Edge edge : sourceVertices.values()) { edge.stopEventBuffering(); @@ -1961,6 +1766,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl Preconditions.checkState(getInternalState() == VertexState.INITIALIZING, "Vertex: " + getLogIdentifier()); } + } finally { writeLock.unlock(); } @@ -2034,28 +1840,52 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl void logJobHistoryVertexInitializedEvent() { + // TODO Vertex init may happen multiple times, so it is possible to have multiple VertexInitializedEvent VertexInitializedEvent initEvt = new VertexInitializedEvent(vertexId, vertexName, initTimeRequested, initedTime, numTasks, - getProcessorName(), getAdditionalInputs()); + getProcessorName(), getAdditionalInputs(), initGeneratedEvents); this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGId(), initEvt)); } void logJobHistoryVertexStartedEvent() { - VertexStartedEvent startEvt = new VertexStartedEvent(vertexId, - startTimeRequested, startedTime); - this.appContext.getHistoryHandler().handle( - new DAGHistoryEvent(getDAGId(), startEvt)); + if (recoveryData == null + || !recoveryData.isVertexStarted()) { + VertexStartedEvent startEvt = new VertexStartedEvent(vertexId, + startTimeRequested, startedTime); + this.appContext.getHistoryHandler().handle( + new DAGHistoryEvent(getDAGId(), startEvt)); + } + } + + void logVertexConfigurationDoneEvent() { + if (recoveryData == null || !recoveryData.shouldSkipInit()) { + Map<String, EdgeProperty> sourceEdgeProperties = new HashMap<String, EdgeProperty>(); + for (Map.Entry<Vertex, Edge> entry : this.sourceVertices.entrySet()) { + sourceEdgeProperties.put(entry.getKey().getName(), entry.getValue().getEdgeProperty()); + } + VertexConfigurationDoneEvent reconfigureDoneEvent = + new VertexConfigurationDoneEvent(vertexId, clock.getTime(), + numTasks, taskLocationHints == null ? null : VertexLocationHint.create(Lists.newArrayList(taskLocationHints)), + sourceEdgeProperties, rootInputSpecs, setParallelismCalledFlag); + this.appContext.getHistoryHandler().handle( + new DAGHistoryEvent(getDAGId(), reconfigureDoneEvent)); + } } void logJobHistoryVertexFinishedEvent() throws IOException { - this.setFinishTime(); - logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, ""); + if (recoveryData == null + || !recoveryData.isVertexSucceeded()) { + logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, ""); + } } void logJobHistoryVertexFailedEvent(VertexState state) throws IOException { - logJobHistoryVertexCompletedHelper(state, clock.getTime(), - StringUtils.join(getDiagnostics(), LINE_SEPARATOR)); + if (recoveryData == null + || !recoveryData.isVertexFinished()) { + logJobHistoryVertexCompletedHelper(state, clock.getTime(), + StringUtils.join(getDiagnostics(), LINE_SEPARATOR)); + } } private void logJobHistoryVertexCompletedHelper(VertexState finalState, long finishTime, @@ -2079,6 +1909,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl // commit only once. Dont commit shared outputs if (vertex.outputCommitters != null && !vertex.outputCommitters.isEmpty()) { + if (vertex.recoveryData != null + && vertex.recoveryData.isVertexCommitted()) { + LOG.info("Vertex was already committed as per recovery" + + " data, vertex=" + vertex.logIdentifier); + return vertex.finished(VertexState.SUCCEEDED); + } boolean firstCommit = true; for (Entry<String, OutputCommitter> entry : vertex.outputCommitters.entrySet()) { final OutputCommitter committer = entry.getValue(); @@ -2354,19 +2190,31 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } private boolean initializeVertex() { - try { - initializeCommitters(); - } catch (Exception e) { - LOG.warn("Vertex Committer init failed, vertex=" + logIdentifier, e); - addDiagnostic("Vertex init failed : " - + ExceptionUtils.getStackTrace(e)); - trySetTerminationCause(VertexTerminationCause.INIT_FAILURE); - finished(VertexState.FAILED); - return false; + // Don't need to initialize committer if vertex is fully completed + if (recoveryData != null && recoveryData.shouldSkipInit()) { + // Do other necessary recovery here + initedTime = recoveryData.getVertexInitedEvent().getInitedTime(); + List<TezEvent> initGeneratedEvents = recoveryData.getVertexInitedEvent().getInitGeneratedEvents(); + if (initGeneratedEvents != null && !initGeneratedEvents.isEmpty()) { + eventHandler.handle(new VertexEventRouteEvent(getVertexId(), initGeneratedEvents)); + } + } else { + initedTime = clock.getTime(); + } + // Only initialize committer when it is in non-recovery mode or vertex is not recovered to completed + // state in recovery mode + if (recoveryData == null || recoveryData.getVertexFinishedEvent() == null) { + try { + initializeCommitters(); + } catch (Exception e) { + LOG.warn("Vertex Committer init failed, vertex=" + logIdentifier, e); + addDiagnostic("Vertex init failed : " + + ExceptionUtils.getStackTrace(e)); + trySetTerminationCause(VertexTerminationCause.INIT_FAILURE); + finished(VertexState.FAILED); + return false; + } } - - // TODO: Metrics - initedTime = clock.getTime(); logJobHistoryVertexInitializedEvent(); return true; @@ -2470,23 +2318,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } } - private VertexState setupVertex() { - return setupVertex(null); - } - private VertexState setupVertex(VertexInitializedEvent event) { + private VertexState setupVertex() { - if (event == null) { - initTimeRequested = clock.getTime(); - } else { - initTimeRequested = event.getInitRequestedTime(); - initedTime = event.getInitedTime(); - } + this.initTimeRequested = clock.getTime(); // VertexManager needs to be setup before attempting to Initialize any // Inputs - since events generated by them will be routed to the // VertexManager for handling. - if (dagVertexGroups != null && !dagVertexGroups.isEmpty()) { List<GroupInputSpec> groupSpecList = Lists.newLinkedList(); for (VertexGroupInfo groupInfo : dagVertexGroups.values()) { @@ -2502,24 +2341,20 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } // Check if any inputs need initializers - if (event != null) { - this.rootInputDescriptors = event.getAdditionalInputs(); - } else { - if (rootInputDescriptors != null) { - LOG.info("Root Inputs exist for Vertex: " + getName() + " : " - + rootInputDescriptors); - for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input - : rootInputDescriptors.values()) { - if (input.getControllerDescriptor() != null && - input.getControllerDescriptor().getClassName() != null) { - if (inputsWithInitializers == null) { - inputsWithInitializers = Sets.newHashSet(); - } - inputsWithInitializers.add(input.getName()); - LOG.info("Starting root input initializer for input: " - + input.getName() + ", with class: [" - + input.getControllerDescriptor().getClassName() + "]"); + if (rootInputDescriptors != null) { + LOG.info("Root Inputs exist for Vertex: " + getName() + " : " + + rootInputDescriptors); + for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input + : rootInputDescriptors.values()) { + if (input.getControllerDescriptor() != null && + input.getControllerDescriptor().getClassName() != null) { + if (inputsWithInitializers == null) { + inputsWithInitializers = Sets.newHashSet(); } + inputsWithInitializers.add(input.getName()); + LOG.info("Starting root input initializer for input: " + + input.getName() + ", with class: [" + + input.getControllerDescriptor().getClassName() + "]"); } } } @@ -2536,13 +2371,21 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl if (hasBipartite && inputsWithInitializers != null) { LOG.error("A vertex with an Initial Input and a Shuffle Input are not supported at the moment"); - if (event != null) { - return VertexState.FAILED; - } else { - return finished(VertexState.FAILED); - } + return finished(VertexState.FAILED); + } + + numTasks = getVertexPlan().getTaskConfig().getNumTasks(); + if (!(numTasks == -1 || numTasks >= 0)) { + addDiagnostic("Invalid task count for vertex" + + ", numTasks=" + numTasks); + trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS); + return VertexState.FAILED; } + checkTaskLimits(); + // set VertexManager as the last step. Because in recovery case, we may need to restore + // some info from last the AM attempt and skip the initialization step. Otherwise numTasks may be + // reset to -1 after the restore. try { assignVertexManager(); } catch (TezException e1) { @@ -2571,38 +2414,43 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause())); return VertexState.FAILED; } - - // Setup tasks early if possible. If the VertexManager is not being used - // to set parallelism, sending events to Tasks is safe (and less confusing - // then relying on tasks to be created after TaskEvents are generated). - // For VertexManagers setting parallelism, the setParallelism call needs - // to be inline. - if (event != null) { - int oldNumTasks = numTasks; - numTasks = event.getNumTasks(); - stateChangeNotifier.stateChanged(vertexId, - new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks)); - } else { - numTasks = getVertexPlan().getTaskConfig().getNumTasks(); - // Not sending a parallelism update notification since this is from the original plan - } - - if (!(numTasks == -1 || numTasks >= 0)) { - addDiagnostic("Invalid task count for vertex" - + ", numTasks=" + numTasks); - trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS); - if (event != null) { - return finished(VertexState.FAILED); - } else { - return VertexState.FAILED; - } - } - - checkTaskLimits(); return VertexState.INITED; } private void assignVertexManager() throws TezException { + // condition for skip initializing stage + // - VertexInputInitializerEvent is seen + // - VertexReconfigureDoneEvent is seen + // - Reason to check whether VertexManager has complete its responsibility + // VertexManager actually is involved in the InputInitializer (InputInitializer generate events + // and send them to VertexManager which do some processing and send back to Vertex), so that means + // Input initializer will affect on the VertexManager and we couldn't skip the initializing step if + // VertexManager has not completed its responsibility. + // - Why using VertexReconfigureDoneEvent + // - VertexReconfigureDoneEvent represent the case that user use API reconfigureVertex + // VertexReconfigureDoneEvent will be logged + if (recoveryData != null + && recoveryData.shouldSkipInit()) { + // Replace the original VertexManager with NoOpVertexManager if the reconfiguration is done in the last AM attempt + VertexConfigurationDoneEvent reconfigureDoneEvent = recoveryData.getVertexConfigurationDoneEvent(); + if (LOG.isDebugEnabled()) { + LOG.debug("VertexManager reconfiguration is done in the last AM Attempt" + + ", use NoOpVertexManager to replace it, vertexId=" + logIdentifier); + LOG.debug("VertexReconfigureDoneEvent=" + reconfigureDoneEvent); + } + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try { + reconfigureDoneEvent.toProtoStream(out); + } catch (IOException e) { + throw new TezUncheckedException("Unable to deserilize VertexReconfigureDoneEvent"); + } + this.vertexManager = new VertexManager( + VertexManagerPluginDescriptor.create(NoOpVertexManager.class.getName()) + .setUserPayload(UserPayload.create(ByteBuffer.wrap(out.toByteArray()))), + dagUgi, this, appContext, stateChangeNotifier); + return; + } + boolean hasBipartite = false; boolean hasOneToOne = false; boolean hasCustom = false; @@ -2671,258 +2519,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } } } - - public static class StartRecoverTransition implements - MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { - - @Override - public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) { - VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vertexEvent; - VertexState desiredState = recoverEvent.getDesiredState(); - - switch (desiredState) { - case RUNNING: - break; - case SUCCEEDED: - case KILLED: - case FAILED: - case ERROR: - if (desiredState == VertexState.SUCCEEDED) { - vertex.succeededTaskCount = vertex.numTasks; - vertex.completedTaskCount = vertex.numTasks; - } else if (desiredState == VertexState.KILLED) { - vertex.killedTaskCount = vertex.numTasks; - } else if (desiredState == VertexState.FAILED || desiredState == VertexState.ERROR) { - vertex.failedTaskCount = vertex.numTasks; - } - if (vertex.tasks != null) { - TaskState taskState = TaskState.KILLED; - if (desiredState == VertexState.SUCCEEDED) { - taskState = TaskState.SUCCEEDED; - } else if (desiredState == VertexState.KILLED) { - taskState = TaskState.KILLED; - } else if (desiredState == VertexState.FAILED || desiredState == VertexState.ERROR) { - taskState = TaskState.FAILED; - } - for (Task task : vertex.tasks.values()) { - vertex.eventHandler.handle( - new TaskEventRecoverTask(task.getTaskId(), - taskState, false)); - } - } - LOG.info("DAG informed Vertex of its final completed state" - + ", vertex=" + vertex.logIdentifier - + ", state=" + desiredState); - return desiredState; - default: - LOG.info("Unhandled desired state provided by DAG" - + ", vertex=" + vertex.logIdentifier - + ", state=" + desiredState); - vertex.finished(VertexState.ERROR); - } - - // recover from recover log, should recover to running - // desiredState must be RUNNING based on above code - VertexState endState; - switch (vertex.recoveredState) { - case NEW: - // Trigger init and start as desired state is RUNNING - // Drop all root events - Iterator<TezEvent> iterator = vertex.recoveredEvents.iterator(); - while (iterator.hasNext()) { - if (iterator.next().getEventType().equals( - EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) { - iterator.remove(); - } - } - vertex.eventHandler.handle(new VertexEvent(vertex.vertexId, - VertexEventType.V_INIT)); - vertex.eventHandler.handle(new VertexEvent(vertex.vertexId, - VertexEventType.V_START)); - endState = VertexState.NEW; - break; - case INITED: - try { - vertex.initializeCommitters(); - } catch (Exception e) { - String msg = "Failed to initialize committers" - + ", vertex=" + vertex.logIdentifier + "," - + ExceptionUtils.getStackTrace(e); - LOG.error(msg); - vertex.finished(VertexState.FAILED, - VertexTerminationCause.INIT_FAILURE, msg); - endState = VertexState.FAILED; - break; - } - - // Recover tasks - if (vertex.tasks != null) { - for (Task task : vertex.tasks.values()) { - vertex.eventHandler.handle( - new TaskEventRecoverTask(task.getTaskId())); - } - } - // Update tasks with their input payloads as needed - - vertex.eventHandler.handle(new VertexEvent(vertex.vertexId, - VertexEventType.V_START)); - if (vertex.getInputVertices().isEmpty()) { - endState = VertexState.INITED; - } else { - endState = VertexState.RECOVERING; - } - break; - case RUNNING: - try { - vertex.initializeCommitters(); - } catch (Exception e) { - String msg = "Failed to initialize committers" - + ", vertex=" + vertex.logIdentifier + "," - + ExceptionUtils.getStackTrace(e); - LOG.error(msg); - vertex.finished(VertexState.FAILED, - VertexTerminationCause.INIT_FAILURE, msg); - endState = VertexState.FAILED; - break; - } - - // if commit in progress and desired state is not a succeeded one, - // move to failed - if (vertex.recoveryCommitInProgress) { - String msg = "Recovered vertex was in the middle of a commit" - + ", failing Vertex=" + vertex.logIdentifier; - LOG.warn(msg); - vertex.finished(VertexState.FAILED, - VertexTerminationCause.COMMIT_FAILURE, msg); - endState = VertexState.FAILED; - break; - } - assert vertex.tasks.size() == vertex.numTasks; - if (vertex.tasks != null && vertex.numTasks != 0) { - for (Task task : vertex.tasks.values()) { - vertex.eventHandler.handle( - new TaskEventRecoverTask(task.getTaskId())); - } - try { - vertex.recoveryCodeSimulatingStart(); - vertex.unsetTasksNotYetScheduled(); - endState = VertexState.RUNNING; - } catch (AMUserCodeException e) { - String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier(); - LOG.error(msg, e); - vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, - msg + ", " + ExceptionUtils.getStackTrace(e.getCause())); - endState = VertexState.FAILED; - } - } else { - // why succeeded here - endState = VertexState.SUCCEEDED; - vertex.finished(endState); - } - break; - case SUCCEEDED: - if (vertex.hasCommitter - && vertex.summaryCompleteSeen && !vertex.vertexCompleteSeen) { - String msg = "Cannot recover vertex as all recovery events not" - + " found, vertex=" + vertex.logIdentifier - + ", hasCommitters=" + vertex.hasCommitter - + ", summaryCompletionSeen=" + vertex.summaryCompleteSeen - + ", finalCompletionSeen=" + vertex.vertexCompleteSeen; - LOG.warn(msg); - vertex.finished(VertexState.FAILED, - VertexTerminationCause.COMMIT_FAILURE, msg); - endState = VertexState.FAILED; - } else { - // recover tasks - if (vertex.tasks != null && vertex.numTasks != 0) { - TaskState taskState = TaskState.SUCCEEDED; - for (Task task : vertex.tasks.values()) { - vertex.eventHandler.handle( - new TaskEventRecoverTask(task.getTaskId(), - taskState)); - } - try { - vertex.recoveryCodeSimulatingStart(); - vertex.unsetTasksNotYetScheduled(); - endState = VertexState.RUNNING; - } catch (AMUserCodeException e) { - String msg = "Exception in " + e.getSource() +", vertex:" + vertex.getLogIdentifier(); - LOG.error(msg, e); - vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, - msg + "," + ExceptionUtils.getStackTrace(e.getCause())); - endState = VertexState.FAILED; - } - } else { - endState = vertex.recoveredState; - vertex.finished(endState); - } - } - break; - case FAILED: - case KILLED: - // vertex may be killed/failed before its tasks are scheduled. so here just recover vertex - // to the recovered state without waiting for its tasks' feedback and recover tasks to - // the corresponding state without recover its data. - if (vertex.tasks != null && vertex.numTasks != 0) { - TaskState taskState = TaskState.FAILED; - if (vertex.recoveredState == VertexState.KILLED) { - taskState = TaskState.KILLED; - } - for (Task task : vertex.tasks.values()) { - vertex.eventHandler.handle( - new TaskEventRecoverTask(task.getTaskId(), - taskState, false)); - } - } - endState = vertex.recoveredState; - vertex.finished(endState); - break; - default: - LOG.warn("Invalid recoveredState found when trying to recover" - + " vertex" - + ", vertex=" + vertex.logIdentifier - + ", recoveredState=" + vertex.recoveredState); - vertex.finished(VertexState.ERROR); - endState = VertexState.ERROR; - break; - } - if (!endState.equals(VertexState.RECOVERING)) { - LOG.info("Recovered Vertex State" - + ", vertexId=" + vertex.logIdentifier - + ", state=" + endState - + ", numInitedSourceVertices=" + vertex.numInitedSourceVertices - + ", numStartedSourceVertices=" + vertex.numStartedSourceVertices - + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices - + ", recoveredEvents=" - + ( vertex.recoveredEvents == null ? "null" : vertex.recoveredEvents.size()) - + ", tasksIsNull=" + (vertex.tasks == null) - + ", numTasks=" + ( vertex.tasks == null ? "null" : vertex.tasks.size())); - for (Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) { - vertex.eventHandler.handle(new VertexEventSourceVertexRecovered( - entry.getKey().getVertexId(), - vertex.vertexId, endState, null, - vertex.getDistanceFromRoot())); - } - } - if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED) - .contains(endState)) { - // Send events downstream - vertex.routeRecoveredEvents(endState, vertex.recoveredEvents); - vertex.recoveredEvents.clear(); - } else { - // Ensure no recovered events - if (!vertex.recoveredEvents.isEmpty()) { - throw new RuntimeException("Invalid Vertex state" - + ", found non-zero recovered events in invalid state" - + ", vertex=" + vertex.logIdentifier - + ", recoveredState=" + endState - + ", recoveredEvents=" + vertex.recoveredEvents.size()); - } - } - return endState; - } - - } private static List<TaskAttemptIdentifier> getTaskAttemptIdentifiers(DAG dag, List<TezTaskAttemptID> taIds) { @@ -2939,73 +2535,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl TezTaskAttemptID taId) { return new TaskAttemptIdentifierImpl(dagName, vertexName, taId); } - - private void recoveryCodeSimulatingStart() throws AMUserCodeException { - vertexManager.onVertexStarted(getTaskAttemptIdentifiers(dag, pendingReportedSrcCompletions)); - // This code is duplicated from startVertex() because recovery does not follow normal - // transitions. To be removed after recovery code is fixed. - maybeSendConfiguredEvent(); - } - - private void routeRecoveredEvents(VertexState vertexState, - List<TezEvent> tezEvents) { - for (TezEvent tezEvent : tezEvents) { - EventMetaData sourceMeta = tezEvent.getSourceInfo(); - TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID(); - if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) { - ((DataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId()); - } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) { - ((CompositeDataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId()); - } else if (tezEvent.getEventType() == EventType.INPUT_FAILED_EVENT) { - ((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId()); - } else if (tezEvent.getEventType() == EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) { - if (vertexState == VertexState.RUNNING - || vertexState == VertexState.INITED) { - // Only routed if vertex is still running - eventHandler.handle(new VertexEventRouteEvent( - this.getVertexId(), Collections.singletonList(tezEvent), true)); - } - continue; - } else if (tezEvent.getEventType() == EventType.ROOT_INPUT_INITIALIZER_EVENT) { - // The event has the relevant target information - InputInitializerEvent iiEvent = (InputInitializerEvent) tezEvent.getEvent(); - iiEvent.setSourceVertexName(vertexName); - eventHandler.handle(new VertexEventRouteEvent( - getDAG().getVertex(iiEvent.getTargetVertexName()).getVertexId(), - Collections.singletonList(tezEvent), true)); - continue; - } - - Vertex destVertex = getDAG().getVertex(sourceMeta.getEdgeVertexName()); - Edge destEdge = targetVertices.get(destVertex); - if (destEdge == null) { - throw new TezUncheckedException("Bad destination vertex: " + - sourceMeta.getEdgeVertexName() + " for event vertex: " + - getLogIdentifier()); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Routing recovered event" - + ", vertex=" + logIdentifier - + ", eventType=" + tezEvent.getEventType() - + ", sourceInfo=" + sourceMeta - + ", destinationVertex=" + destVertex.getLogIdentifier()); - } - eventHandler.handle(new VertexEventRouteEvent(destVertex - .getVertexId(), Collections.singletonList(tezEvent), true)); - } - } - - public static class TerminateDuringRecoverTransition implements - SingleArcTransition<VertexImpl, VertexEvent> { - - @Override - public void transition(VertexImpl vertex, VertexEvent vertexEvent) { - LOG.info("Received a terminate during recovering, setting recovered" - + " state to KILLED"); - vertex.recoveredState = VertexState.KILLED; - } - - } public static class NullEdgeInitializedTransition implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { @@ -3036,375 +2565,64 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } } - public static class BufferDataRecoverTransition implements - SingleArcTransition<VertexImpl, VertexEvent> { - - @Override - public void transition(VertexImpl vertex, VertexEvent vertexEvent) { - LOG.info("Received upstream event while still recovering" - + ", vertexId=" + vertex.logIdentifier - + ", vertexEventType=" + vertexEvent.getType()); - if (vertexEvent.getType().equals(VertexEventType.V_ROUTE_EVENT)) { - VertexEventRouteEvent evt = (VertexEventRouteEvent) vertexEvent; - vertex.pendingRouteEvents.addAll(evt.getEvents()); - } else if (vertexEvent.getType().equals( - VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED)) { - VertexEventSourceTaskAttemptCompleted evt = - (VertexEventSourceTaskAttemptCompleted) vertexEvent; - vertex.pendingReportedSrcCompletions.add( - evt.getCompletionEvent().getTaskAttemptId()); - } else if (vertexEvent.getType().equals( - VertexEventType.V_SOURCE_VERTEX_STARTED)) { - VertexEventSourceVertexStarted startEvent = - (VertexEventSourceVertexStarted) vertexEvent; - int distanceFromRoot = startEvent.getSourceDistanceFromRoot() + 1; - if(vertex.distanceFromRoot < distanceFromRoot) { - vertex.distanceFromRoot = distanceFromRoot; - } - ++vertex.numStartedSourceVertices; - } else if (vertexEvent.getType().equals(VertexEventType.V_INIT)) { - ++vertex.numInitedSourceVertices; - } - } - } - - public static class RecoverTransition implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { @Override public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) { - VertexEventSourceVertexRecovered sourceRecoveredEvent = - (VertexEventSourceVertexRecovered) vertexEvent; - // Use distance from root from Recovery events as upstream vertices may not - // send source vertex started event that is used to compute distance - int distanceFromRoot = sourceRecoveredEvent.getSourceDistanceFromRoot() + 1; - if(vertex.distanceFromRoot < distanceFromRoot) { - vertex.distanceFromRoot = distanceFromRoot; - } - - ++vertex.numRecoveredSourceVertices; - - switch (sourceRecoveredEvent.getSourceVertexState()) { - case NEW: - // Nothing to do - break; - case INITED: - ++vertex.numInitedSourceVertices; - break; - case RUNNING: - case SUCCEEDED: - ++vertex.numInitedSourceVertices; - ++vertex.numStartedSourceVertices; - if (sourceRecoveredEvent.getCompletedTaskAttempts() != null) { - vertex.pendingReportedSrcCompletions.addAll( - sourceRecoveredEvent.getCompletedTaskAttempts()); - } - break; - case FAILED: - case KILLED: - case ERROR: - // Nothing to do - // Recover as if source vertices have not inited/started - break; - default: - LOG.warn("Received invalid SourceVertexRecovered event" - + ", vertex=" + vertex.logIdentifier - + ", sourceVertex=" + sourceRecoveredEvent.getSourceVertexID() - + ", sourceVertexState=" + sourceRecoveredEvent.getSourceVertexState()); - return vertex.finished(VertexState.ERROR); - } - - if (vertex.numRecoveredSourceVertices != - vertex.getInputVerticesCount()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Waiting for source vertices to recover" - + ", vertex=" + vertex.logIdentifier - + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices - + ", totalSourceVertices=" + vertex.getInputVerticesCount()); - } - return VertexState.RECOVERING; - } - - - // Complete recovery - VertexState endState = VertexState.NEW; - List<TezTaskAttemptID> completedTaskAttempts = Lists.newLinkedList(); - switch (vertex.recoveredState) { - case NEW: - // Drop all root events if not inited properly - Iterator<TezEvent> iterator = vertex.recoveredEvents.iterator(); - while (iterator.hasNext()) { - if (iterator.next().getEventType().equals( - EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) { - iterator.remove(); - } - } - // Trigger init if all sources initialized - if (vertex.numInitedSourceVertices == vertex.getInputVerticesCount()) { - vertex.eventHandler.handle(new VertexEvent(vertex.vertexId, - VertexEventType.V_INIT)); - } - if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) { - vertex.eventHandler.handle(new VertexEvent(vertex.vertexId, - VertexEventType.V_START)); - } - endState = VertexState.NEW; - break; - case INITED: - vertex.vertexAlreadyInitialized = true; - try { - vertex.initializeCommitters(); - } catch (Exception e) { - String msg = "Failed to initialize committers, vertex=" - + vertex.logIdentifier + "," + ExceptionUtils.getStackTrace(e); - LOG.error(msg); - vertex.finished(VertexState.FAILED, - VertexTerminationCause.INIT_FAILURE, msg); - endState = VertexState.FAILED; - break; - } - boolean successSetParallelism ; - try { - // recovering only edge manager - vertex.setParallelism(0, - null, vertex.recoveredSourceEdgeProperties, vertex.recoveredRootInputSpecUpdates, true, false); - successSetParallelism = true; - } catch (Exception e) { - successSetParallelism = false; - } - if (!successSetParallelism) { - String msg = "Failed to recover edge managers, vertex=" + vertex.logIdentifier; - LOG.error(msg); - vertex.finished(VertexState.FAILED, - VertexTerminationCause.INIT_FAILURE, msg); - endState = VertexState.FAILED; - break; - } - // Recover tasks - if (vertex.tasks != null) { - for (Task task : vertex.tasks.values()) { - vertex.eventHandler.handle( - new TaskEventRecoverTask(task.getTaskId())); - } - } - if (vertex.numInitedSourceVertices != vertex.getInputVerticesCount()) { - LOG.info("Vertex already initialized but source vertices have not" - + " initialized" - + ", vertexId=" + vertex.logIdentifier - + ", numInitedSourceVertices=" + vertex.numInitedSourceVertices); - } else { - if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) { - vertex.eventHandler.handle(new VertexEvent(vertex.vertexId, - VertexEventType.V_START)); - } - } - endState = VertexState.INITED; - break; - case RUNNING: - // if commit in progress and desired state is not a succeeded one, - // move to failed - if (vertex.recoveryCommitInProgress) { - LOG.info("Recovered vertex was in the middle of a commit" - + ", failing Vertex=" + vertex.logIdentifier); - vertex.finished(VertexState.FAILED, - VertexTerminationCause.COMMIT_FAILURE, null); - endState = VertexState.FAILED; - break; - } - try { - vertex.initializeCommitters(); - } catch (Exception e) { - String msg = "Failed to initialize committers, vertex=" - + vertex.logIdentifier + "," + ExceptionUtils.getStackTrace(e); - LOG.error(msg); - vertex.finished(VertexState.FAILED, - VertexTerminationCause.INIT_FAILURE, msg); - endState = VertexState.FAILED; - break; - } - try { - vertex.setParallelism(vertex.numTasks, null, vertex.recoveredSourceEdgeProperties, - vertex.recoveredRootInputSpecUpdates, true, false); - successSetParallelism = true; - } catch (Exception e) { - successSetParallelism = false; - } - if (!successSetParallelism) { - String msg = "Failed to recover edge managers for vertex:" + vertex.logIdentifier; - LOG.error(msg); - vertex.finished(VertexState.FAILED, - VertexTerminationCause.INIT_FAILURE, msg); - endState = VertexState.FAILED; - break; - } - assert vertex.tasks.size() == vertex.numTasks; - if (vertex.tasks != null && vertex.numTasks != 0) { - for (Task task : vertex.tasks.values()) { - vertex.eventHandler.handle( - new TaskEventRecoverTask(task.getTaskId())); - } - try { - vertex.recoveryCodeSimulatingStart(); - vertex.unsetTasksNotYetScheduled(); - endState = VertexState.RUNNING; - } catch (AMUserCodeException e) { - String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier(); - LOG.error(msg, e); - vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, - msg + "," + ExceptionUtils.getStackTrace(e.getCause())); - endState = VertexState.FAILED; - } - } else { - endState = VertexState.SUCCEEDED; - vertex.finished(endState); - } - break; - case SUCCEEDED: - // recover tasks - assert vertex.tasks.size() == vertex.numTasks; - if (vertex.tasks != null && vertex.numTasks != 0) { - TaskState taskState = TaskState.SUCCEEDED; - for (Task task : vertex.tasks.values()) { - vertex.eventHandler.handle( - new TaskEventRecoverTask(task.getTaskId(), - taskState)); - } - // Wait for all tasks to recover and report back - try { - vertex.recoveryCodeSimulatingStart(); - vertex.unsetTasksNotYetScheduled(); - endState = VertexState.RUNNING; - } catch (AMUserCodeException e) { - String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier(); - LOG.error(msg, e); - vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, - msg + "," + ExceptionUtils.getStackTrace(e.getCause())); - endState = VertexState.FAILED; - } - } else { - endState = vertex.recoveredState; - vertex.finished(endState); - } - break; - case FAILED: - case KILLED: - // vertex may be killed/failed before its tasks are scheduled. so here just recover vertex - // to the recovered state without waiting for its tasks' feedback and recover tasks to - // the corresponding state without recover its data. - if (vertex.tasks != null && vertex.numTasks != 0) { - TaskState taskState = TaskState.FAILED; - if (vertex.recoveredState == VertexState.KILLED) { - taskState = TaskState.KILLED; - } - for (Task task : vertex.tasks.values()) { - vertex.eventHandler.handle( - new TaskEventRecoverTask(task.getTaskId(), - taskState, false)); - } - } - endState = vertex.recoveredState; - vertex.finished(endState); - break; - default: - LOG.warn("Invalid recoveredState found when trying to recover" - + " vertex, recoveredState=" + vertex.recoveredState); - vertex.finished(VertexState.ERROR); - endState = VertexState.ERROR; - break; - } - - LOG.info("Recovered Vertex State" - + ", vertexId=" + vertex.logIdentifier - + ", state=" + endState - + ", numInitedSourceVertices" + vertex.numInitedSourceVertices - + ", numStartedSourceVertices=" + vertex.numStartedSourceVertices - + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices - + ", tasksIsNull=" + (vertex.tasks == null) - + ", numTasks=" + ( vertex.tasks == null ? 0 : vertex.tasks.size())); - - for (Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) { - vertex.eventHandler.handle(new VertexEventSourceVertexRecovered( - entry.getKey().getVertexId(), - vertex.vertexId, endState, completedTaskAttempts, - vertex.getDistanceFromRoot())); - } - if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED) - .contains(endState)) { - // Send events downstream - vertex.routeRecoveredEvents(endState, vertex.recoveredEvents); - vertex.recoveredEvents.clear(); - if (!vertex.pendingRouteEvents.isEmpty()) { - try { - vertex.handleRoutedTezEvents(vertex.pendingRouteEvents, false, true); - vertex.pendingRouteEvents.clear(); - } catch (AMUserCodeException e) { - String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier(); - LOG.error(msg, e); - vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, - msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause())); - endState = VertexState.FAILED; - } - } - } else { - // Ensure no recovered events - if (!vertex.recoveredEvents.isEmpty()) { - throw new RuntimeException("Invalid Vertex state" - + ", found non-zero recovered events in invalid state" - + ", recoveredState=" + endState - + ", recoveredEvents=" + vertex.recoveredEvents.size()); - } + VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vertexEvent; + // with desired state, for the cases that DAG is completed + VertexState desiredState = recoverEvent.getDesiredState(); + switch (desiredState) { + case SUCCEEDED: + vertex.succeededTaskCount = vertex.numTasks; + vertex.completedTaskCount = vertex.numTasks; + break; + case KILLED: + vertex.killedTaskCount = vertex.numTasks; + break; + case FAILED: + case ERROR: + vertex.failedTaskCount = vertex.numTasks; + break; + default: + LOG.info("Unhandled desired state provided by DAG" + + ", vertex=" + vertex.logIdentifier + + ", state=" + desiredState); + return vertex.finished(VertexState.ERROR); } - return endState; - } - - } - public static class IgnoreInitInInitedTransition implements - MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { - - @Override - public VertexState transition(VertexImpl vertex, VertexEvent event) { - LOG.info("Received event during INITED state" + LOG.info("DAG informed vertices of its final completed state" + ", vertex=" + vertex.logIdentifier - + ", eventType=" + event.getType()); - if (!vertex.vertexAlreadyInitialized) { - LOG.error("Vertex not initialized but in INITED state" - + ", vertexId=" + vertex.logIdentifier); - return vertex.finished(VertexState.ERROR); - } else { - return VertexState.INITED; - } + + ", desiredState=" + desiredState); + return vertex.finished(recoverEvent.getDesiredState()); } } - - - + public static class InitTransition implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { @Override public VertexState transition(VertexImpl vertex, VertexEvent event) { + // recover from recovery data (NEW->FAILED/KILLED) + if (vertex.recoveryData != null + && !vertex.recoveryData.isVertexInited() + && vertex.recoveryData.isVertexFinished()) { + VertexFinishedEvent finishedEvent = vertex.recoveryData.getVertexFinishedEvent(); + vertex.diagnostics.add(finishedEvent.getDiagnostics()); + return vertex.finished(finishedEvent.getState()); + } + VertexState vertexState = VertexState.NEW; vertex.numInitedSourceVertices++; - // TODO fix this as part of TEZ-1008 - // Should have a different way to infer source vertices INITED - // as compared to a recovery triggered INIT - // In normal flow, upstream vertices send a V_INIT downstream to - // trigger an init of the downstream vertex. In case of recovery, - // upstream vertices may not send this event if they are already in a - // RUNNING or completed state. Hence, recovering vertices may send - // themselves a V_INIT to trigger a transition. Hence, the count may - // go one over. if (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() || - (vertex.numInitedSourceVertices == vertex.sourceVertices.size() - || vertex.numInitedSourceVertices == (vertex.sourceVertices.size()+1))) { - vertexState = handleInitEvent(vertex, event); + (vertex.numInitedSourceVertices == vertex.sourceVertices.size())) { + vertexState = handleInitEvent(vertex); if (vertexState != VertexState.FAILED) { if (vertex.targetVertices != null && !vertex.targetVertices.isEmpty()) { for (Vertex target : vertex.targetVertices.keySet()) { vertex.getEventHandler().handle(new VertexEvent(target.getVertexId(), - VertexEventType.V_INIT)); + VertexEventType.V_INIT)); } } } @@ -3412,11 +2630,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl return vertexState; } - private VertexState handleInitEvent(VertexImpl vertex, VertexEvent event) { + private VertexState handleInitEvent(VertexImpl vertex) { VertexState state = vertex.setupVertex(); if (state.equals(VertexState.FAILED)) { return state; } + // TODO move before to handle NEW state if (vertex.targetVertices != null) { for (Edge e : vertex.targetVertices.values()) { @@ -3448,13 +2667,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl + " to set #tasks for the vertex " + vertex.getLogIdentifier()); if (vertex.inputsWithInitializers != null) { - LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier); - try { - vertex.setupInputInitializerManager(); - } catch (TezException e) { - String msg = "Fail to create InputInitializerManager, " + ExceptionUtils.getStackTrace(e); - LOG.info(msg); - return vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg); + if (vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit()) { + LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier); + try { + vertex.setupInputInitializerManager(); + } catch (TezException e) { + String msg = "Fail to create InputInitializerManager, " + ExceptionUtils.getStackTrace(e); + LOG.info(msg); + return vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg); + } } return VertexState.INITIALIZING; } else { @@ -3484,7 +2705,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl LOG.info("Creating " + vertex.numTasks + " tasks for vertex: " + vertex.logIdentifier); vertex.createTasks(); // this block may return VertexState.INITIALIZING - if (vertex.inputsWithInitializers != null) { + if (vertex.inputsWithInitializers != null && + (vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit())) { LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier); try { vertex.setupInputInitializerManager(); @@ -3608,7 +2830,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl List<TezEvent> inputInfoEvents = iEvent.getEvents(); try { if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) { - vertex.handleRoutedTezEvents(inputInfoEvents, false, false); + vertex.initGeneratedEvents.addAll(inputInfoEvents); + vertex.handleRoutedTezEvents(inputInfoEvents, false); } } catch (AMUserCodeException e) { String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier(); @@ -3718,8 +2941,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl public static class StartTransition implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { - @Override - public VertexState transition(VertexImpl vertex, VertexEvent event) { + @Override + public VertexState transition(VertexImpl vertex, VertexEvent event) { Preconditions.checkState(vertex.getState() == VertexState.INITED, "Unexpected state " + vertex.getState() + " for " + vertex.logIdentifier); // if the start signal is pending this event is a fake start event to trigger this transition @@ -3739,19 +2962,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl if (completelyConfiguredSent.compareAndSet(false, true)) { stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdate(vertexName, org.apache.tez.dag.api.event.VertexState.CONFIGURED)); + logVertexConfigurationDoneEvent(); } - } + } } private VertexState startVertex() { - // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! - // IMPORTANT - Until Recovery is fixed to use normal state transitions, if any code is added - // here then please check if it needs to be duplicated in recoveryCodeSimulatingStart(). - // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Preconditions.checkState(getState() == VertexState.INITED, "Vertex must be inited " + logIdentifier); - startedTime = clock.getTime(); + if (recoveryData != null && recoveryData.isVertexStarted()) { + VertexStartedEvent vertexStartedEvent = recoveryData.getVertexStartedEvent(); + this.startedTime = vertexStartedEvent.getStartTime(); + } else { + this.startedTime = clock.getTime(); + } + try { vertexManager.onVertexStarted(getTaskAttemptIdentifiers(dag, pendingReportedSrcCompletions)); } catch (AMUserCodeException e) { @@ -3967,14 +3193,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier(); LOG.error(msg, e); - if (vertex.getState() == VertexState.RECOVERING) { - LOG.info("Received a user code error during recovering, setting recovered" - + " state to FAILED"); - vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause())); - vertex.trySetTerminationCause(VertexTerminationCause.AM_USERCODE_FAILURE); - vertex.recoveredState = VertexState.FAILED; - return VertexState.RECOVERING; - } else if (vertex.getState() == VertexState.RUNNING || vertex.getState() == VertexState.COMMITTING) { + if (vertex.getState() == VertexState.RUNNING || vertex.getState() == VertexState.COMMITTING) { vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause())); vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE); @@ -4269,10 +3488,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl @Override public VertexState transition(VertexImpl vertex, VertexEvent event) { VertexEventRouteEvent rEvent = (VertexEventRouteEvent) event; - boolean recovered = rEvent.isRecovered(); List<TezEvent> tezEvents = rEvent.getEvents(); try { - vertex.handleRoutedTezEvents(tezEvents, recovered, false); + vertex.handleRoutedTezEvents(tezEvents, false); } catch (AMUserCodeException e) { String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier(); LOG.error(msg, e); @@ -4413,37 +3631,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl return new TaskAttemptEventInfo(nextFromEventId, events, nextPreRoutedFromEventId); } - private void handleRoutedTezEvents(List<TezEvent> tezEvents, boolean recovered, boolean isPendingEvents) throws AMUserCodeException { - if (getAppContext().isRecoveryEnabled() - && !recovered - && !isPendingEvents - && !tezEvents.isEmpty()) { - List<TezEvent> recoveryEvents = - Lists.newArrayList(); - for (TezEvent tezEvent : tezEvents) { - if (!isEventFromVertex(this, tezEvent.getSourceInfo())) { - continue; - } - if (tezEvent.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT) - || tezEvent.getEventType().equals(EventType.DATA_MOVEMENT_EVENT) - || tezEvent.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) - || tezEvent.getEventType().equals(EventType.ROOT_INPUT_INITIALIZER_EVENT)) { - recoveryEvents.add(tezEvent); - } - } - if (!recoveryEvents.isEmpty()) { - VertexRecoverableEventsGeneratedEvent historyEvent = - new VertexRecoverableEventsGeneratedEvent(vertexId, - recoveryEvents); - appContext.getHistoryHandler().handle( - new DAGHistoryEvent(getDAGId(), historyEvent)); - } - } + private void handleRoutedTezEvents(List<TezEvent> tezEvents, boolean isPendingEvents) throws AMUserCodeException { for(TezEvent tezEvent : tezEvents) { if (LOG.isDebugEnabled()) { LOG.debug("Vertex: " + getLogIdentifier() + " routing event: " - + tezEvent.getEventType() - + " Recovered:" + recovered); + + tezEvent.getEventType()); } EventMetaData sourceMeta = tezEvent.getSourceInfo(); switch(tezEvent.getEventType()) { @@ -4580,44 +3772,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl srcEdge.sendTezEventToSourceTasks(tezEvent); } break; - case TASK_ATTEMPT_FAILED_EVENT: - { - checkEventSourceMetadata(this, sourceMeta); - TaskAttemptTerminationCause errCause = null; - switch (sourceMeta.getEventGenerator()) { - case INPUT: - errCause = TaskAttemptTerminationCause.INPUT_READ_ERROR; - break; - case PROCESSOR: - errCause = TaskAttemptTerminationCause.APPLICATION_ERROR; - break; - case OUTPUT: - errCause = TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR; - break; - case SYSTEM: - errCause = TaskAttemptTerminationCause.FRAMEWORK_ERROR; - break; - default: - throw new TezUncheckedException("Unknown EventProducerConsumerType: " + - sourceMeta.getEventGenerator()); - } - TaskAttemptFailedEvent taskFailedEvent = - (TaskAttemptFailedEvent) tezEvent.getEvent(); - getEventHandler().handle( - new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(), - TaskAttemptEventType.TA_FAILED, - "Error: " + taskFailedEvent.getDiagnostics(), - errCause) - ); - } - break; - case TASK_ATTEMPT_COMPLETED_EVENT: - { - checkEventSourceMetadata(this, sourceMeta); - getEventHandler().handle( - new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_DONE)); - } - break; default: throw new TezUncheckedException("Unhandled tez event type: " + tezEvent.getEventType()); @@ -4708,12 +3862,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl return org.apache.tez.dag.api.event.VertexState.FAILED; case KILLED: return org.apache.tez.dag.api.event.VertexState.KILLED; - case NEW: case INITIALIZING: + return org.apache.tez.dag.api.event.VertexState.INITIALIZING; + case NEW: case INITED: case ERROR: case TERMINATING: - case RECOVERING: default: throw new TezUncheckedException( "Not expecting state updates for state: " + vertexState + ", VertexID: " + vertexId); @@ -5104,4 +4258,100 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl LOG.debug("Vertex: " + vertexName + ", rack: " + rack.toString()); } } + + /** + * This is for recovery when VertexReconfigureDoneEvent is seen. + */ + public static class NoOpVertexManager extends VertexManagerPlugin { + + private VertexConfigurationDoneEvent configurationDoneEvent; + private boolean setParallelismInInitializing = false; + + public NoOpVertexManager(VertexManagerPluginContext context) { + super(context); + } + + @Override + public void initialize() throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("initialize NoOpVertexManager"); + } + configurationDoneEvent = new VertexConfigurationDoneEvent(); + configurationDoneEvent.fromProtoStream(new ByteArrayInputStream(getContext().getUserPayload().deepCopyAsArray())); + String vertexName = getContext().getVertexName(); + if (getContext().getVertexNumTasks(vertexName) == -1) { + Preconditions.checkArgument(configurationDoneEvent.isSetParallelismCalled(), "SetParallelism must be called " + + "when numTasks is -1"); + setParallelismInInitializing = true; + getContext().registerForVertexStateUpdates(vertexName, + Sets.newHashSet(org.apache.tez.dag.api.event.VertexState.INITIALIZING)); + } + getContext().vertexReconfigurationPlanned(); + } + + @Override + public void onVertexStarted(List<TaskAttemptIdentifier> completions) + throws Exception { + // apply the ReconfigureDoneEvent and then schedule all the tasks. + if (LOG.isDebugEnabled()) { + LOG.debug("onVertexStarted is invoked in NoOpVertexManager, vertex=" + getContext().getVertexName()); + } + if (!setParallelismInInitializing && configurationDoneEvent.isSetParallelismCalled()) { + reconfigureVertex(); + } + getContext().doneReconfiguringVertex(); + int numTasks = getContext().getVertexNumTasks(getContext().getVertexName()); + if (LOG.isDebugEnabled()) { + LOG.debug("Schedule all the tasks, numTask=" + numTasks); + } + List<ScheduleTaskRequest> tasks = new ArrayList<ScheduleTaskRequest>(); + for (int i=0;i<numTasks;++i) { + tasks.add(ScheduleTaskRequest.create(i, null)); + } + getContext().scheduleTasks(tasks); + } + + @Override + public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) + throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("onSourceTaskCompleted is invoked in NoOpVertexManager, vertex=" + getContext().getVertexName()); + } + } + + @Override + public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) + throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("onVertexManagerEventReceived is invoked in NoOpVertexManager, vertex=" + getContext().getVertexName()); + } + } + + @Override + public
<TRUNCATED>
