Repository: tez Updated Branches: refs/heads/master de21f990a -> dd9c517e3
TEZ-3715. Differentiate between TaskAttempt submission and TaskAttempt started. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/dd9c517e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/dd9c517e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/dd9c517e Branch: refs/heads/master Commit: dd9c517e3ae2f811e03242dd71891d1b2c7faf5e Parents: de21f99 Author: Siddharth Seth <[email protected]> Authored: Fri May 12 17:05:47 2017 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri May 12 17:05:47 2017 -0700 ---------------------------------------------------------------------- .../tez/serviceplugins/api/TaskScheduler.java | 15 ++ .../dag/app/TaskCommunicatorContextImpl.java | 13 +- .../tez/dag/app/TaskCommunicatorManager.java | 9 +- .../tez/dag/app/TezTaskCommunicatorImpl.java | 4 +- .../dag/app/dag/TaskAttemptStateInternal.java | 1 + .../event/TaskAttemptEventStartedRemotely.java | 33 +--- .../dag/event/TaskAttemptEventSubmitted.java | 49 +++++ .../dag/app/dag/event/TaskAttemptEventType.java | 1 + .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 94 +++++++++- .../app/rm/AMSchedulerEventTAStateUpdated.java | 42 +++++ .../tez/dag/app/rm/AMSchedulerEventType.java | 1 + .../tez/dag/app/rm/TaskSchedulerManager.java | 20 ++ .../tez/dag/app/rm/TaskSchedulerWrapper.java | 4 + .../api/TaskCommunicatorContext.java | 22 ++- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 183 +++++++++++++------ .../tez/dag/app/dag/impl/TestVertexImpl.java | 19 +- .../TezTestServiceTaskCommunicatorImpl.java | 5 +- 17 files changed, 406 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java index 5875bd2..b28a684 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java @@ -45,6 +45,10 @@ public abstract class TaskScheduler implements ServicePluginLifecycle { private final TaskSchedulerContext taskSchedulerContext; + public enum SchedulerTaskState { + SUBMITTED, STARTED, + } + public TaskScheduler(TaskSchedulerContext taskSchedulerContext) { this.taskSchedulerContext = taskSchedulerContext; } @@ -192,6 +196,17 @@ public abstract class TaskScheduler implements ServicePluginLifecycle { Object clientCookie) throws ServicePluginException; /** + * Information about the state of a previously allocated task. + * + * @param task the task for which an update is being provided + * @param state the updated state + * @throws ServicePluginException + */ + public void taskStateUpdated(Object task, SchedulerTaskState state) throws + ServicePluginException { + } + + /** * A request to deallocate a task. This is typically a result of a task completing - with success * or failure. It could also be the result of a decision to not run the task, before it is * allocated or started. http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java index 2709787..1adbf6e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java @@ -123,8 +123,19 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver } @Override + public void taskSubmitted(TezTaskAttemptID taskAttemptId, ContainerId containerId) { + taskCommunicatorManager.taskSubmitted(taskAttemptId, containerId); + } + + @Override + public void taskStartedRemotely(TezTaskAttemptID taskAttemptId) { + taskCommunicatorManager.taskStartedRemotely(taskAttemptId); + } + + @Override public void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId) { - taskCommunicatorManager.taskStartedRemotely(taskAttemptId, containerId); + taskSubmitted(taskAttemptId, containerId); + taskStartedRemotely(taskAttemptId); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java index c9d1f2e..af82c29 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java @@ -33,6 +33,7 @@ import org.apache.commons.collections4.ListUtils; import org.apache.hadoop.yarn.event.Event; import org.apache.tez.Utils; import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted; import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.events.TaskAttemptKilledEvent; import org.apache.tez.serviceplugins.api.DagInfo; @@ -374,11 +375,15 @@ public class TaskCommunicatorManager extends AbstractService implements pingContainerHeartbeatHandler(containerId); } - public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) { - sendEvent(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null)); + public void taskSubmitted(TezTaskAttemptID taskAttemptId, ContainerId containerId) { + sendEvent(new TaskAttemptEventSubmitted(taskAttemptId, containerId)); pingContainerHeartbeatHandler(containerId); } + public void taskStartedRemotely(TezTaskAttemptID taskAttemptID) { + sendEvent(new TaskAttemptEventStartedRemotely(taskAttemptID)); + } + public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, String diagnostics) { // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler, http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index 4563ba6..9b700f8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -315,8 +315,8 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { } task = getContainerTask(containerId); if (task != null && !task.shouldDie()) { - getContext() - .taskStartedRemotely(task.getTaskSpec().getTaskAttemptID(), containerId); + getContext().taskSubmitted(task.getTaskSpec().getTaskAttemptID(), containerId); + getContext().taskStartedRemotely(task.getTaskSpec().getTaskAttemptID()); } } if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java index 8d0d83e..6ddfabb 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java @@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; public enum TaskAttemptStateInternal { NEW, START_WAIT, + SUBMITTED, RUNNING, KILL_IN_PROGRESS, FAIL_IN_PROGRESS, http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java index e700c6c..d83eda9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java @@ -18,43 +18,16 @@ package org.apache.tez.dag.app.dag.event; -import java.util.Map; - -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tez.dag.records.TezTaskAttemptID; public class TaskAttemptEventStartedRemotely extends TaskAttemptEvent implements RecoveryEvent { - private final ContainerId containerId; - // TODO Can appAcls be handled elsewhere ? - private final Map<ApplicationAccessType, String> applicationACLs; - private boolean fromRecovery = false; - - public TaskAttemptEventStartedRemotely(TezTaskAttemptID id, ContainerId containerId, - Map<ApplicationAccessType, String> appAcls) { + public TaskAttemptEventStartedRemotely(TezTaskAttemptID id) { super(id, TaskAttemptEventType.TA_STARTED_REMOTELY); - this.containerId = containerId; - this.applicationACLs = appAcls; - } - - public TaskAttemptEventStartedRemotely(TezTaskAttemptID id, ContainerId containerId, - Map<ApplicationAccessType, String> appAcls, boolean fromRecovery) { - this(id, containerId, appAcls); - this.fromRecovery = fromRecovery; - } - - public ContainerId getContainerId() { - return containerId; - } - - public Map<ApplicationAccessType, String> getApplicationACLs() { - return applicationACLs; } @Override public boolean isFromRecovery() { - return fromRecovery; + return false; } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventSubmitted.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventSubmitted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventSubmitted.java new file mode 100644 index 0000000..9fae772 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventSubmitted.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.event; + + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.dag.records.TezTaskAttemptID; + +public class TaskAttemptEventSubmitted extends TaskAttemptEvent implements RecoveryEvent { + + private final ContainerId containerId; + private boolean fromRecovery = false; + + public TaskAttemptEventSubmitted(TezTaskAttemptID id, ContainerId containerId) { + super(id, TaskAttemptEventType.TA_SUBMITTED); + this.containerId = containerId; + } + + public TaskAttemptEventSubmitted(TezTaskAttemptID id, ContainerId containerId, + boolean fromRecovery) { + this(id, containerId); + this.fromRecovery = fromRecovery; + } + + public ContainerId getContainerId() { + return containerId; + } + + @Override + public boolean isFromRecovery() { + return fromRecovery; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java index dacb0c2..63779fd 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java @@ -27,6 +27,7 @@ public enum TaskAttemptEventType { TA_SCHEDULE, //Producer: TaskAttemptListener | Vertex after routing events + TA_SUBMITTED, TA_STARTED_REMOTELY, TA_STATUS_UPDATE, TA_TEZ_EVENT_UPDATE, // for recovery http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 8a81575..07aed5e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -31,12 +31,15 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang.StringUtils; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted; import org.apache.tez.dag.app.dag.event.TaskEvent; import org.apache.tez.dag.app.dag.event.TaskEventTAFailed; import org.apache.tez.dag.app.dag.event.TaskEventTAKilled; import org.apache.tez.dag.app.dag.event.TaskEventTALaunched; import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded; +import org.apache.tez.dag.app.rm.AMSchedulerEventTAStateUpdated; import org.apache.tez.runtime.api.TaskFailureType; +import org.apache.tez.serviceplugins.api.TaskScheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -276,8 +279,8 @@ public class TaskAttemptImpl implements TaskAttempt, new SucceededTransition()) .addTransition(TaskAttemptStateInternal.START_WAIT, - TaskAttemptStateInternal.RUNNING, - TaskAttemptEventType.TA_STARTED_REMOTELY, new StartedTransition()) + TaskAttemptStateInternal.SUBMITTED, + TaskAttemptEventType.TA_SUBMITTED, new SubmittedTransition()) .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, @@ -303,6 +306,62 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM, new ContainerCompletedBeforeRunningTransition(KILLED_HELPER)) + .addTransition(TaskAttemptStateInternal.SUBMITTED, + TaskAttemptStateInternal.RUNNING, + TaskAttemptEventType.TA_STARTED_REMOTELY, new StartedTransition()) + .addTransition(TaskAttemptStateInternal.SUBMITTED, + TaskAttemptStateInternal.SUBMITTED, + TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER) + // Optional, may not come in for all tasks. + .addTransition(TaskAttemptStateInternal.SUBMITTED, + TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE, + new SucceededTransition()) + .addTransition(TaskAttemptStateInternal.SUBMITTED, + TaskAttemptStateInternal.FAIL_IN_PROGRESS, + TaskAttemptEventType.TA_FAILED, + new TerminatedWhileRunningTransition(FAILED_HELPER)) + .addTransition(TaskAttemptStateInternal.SUBMITTED, + TaskAttemptStateInternal.FAIL_IN_PROGRESS, + TaskAttemptEventType.TA_TIMED_OUT, + new TerminatedWhileRunningTransition(FAILED_HELPER)) + .addTransition(TaskAttemptStateInternal.SUBMITTED, + TaskAttemptStateInternal.KILL_IN_PROGRESS, + TaskAttemptEventType.TA_KILL_REQUEST, + new TerminatedWhileRunningTransition(KILLED_HELPER)) + .addTransition(TaskAttemptStateInternal.SUBMITTED, + TaskAttemptStateInternal.KILLED, + TaskAttemptEventType.TA_KILLED, + new TerminatedWhileRunningTransition(KILLED_HELPER)) + .addTransition(TaskAttemptStateInternal.SUBMITTED, + TaskAttemptStateInternal.KILL_IN_PROGRESS, + TaskAttemptEventType.TA_NODE_FAILED, + new TerminatedWhileRunningTransition(KILLED_HELPER)) + .addTransition(TaskAttemptStateInternal.SUBMITTED, + TaskAttemptStateInternal.FAIL_IN_PROGRESS, + TaskAttemptEventType.TA_CONTAINER_TERMINATING, + new TerminatedWhileRunningTransition(FAILED_HELPER)) + .addTransition(TaskAttemptStateInternal.SUBMITTED, + TaskAttemptStateInternal.FAILED, + TaskAttemptEventType.TA_CONTAINER_TERMINATED, + new ContainerCompletedWhileRunningTransition()) + .addTransition(TaskAttemptStateInternal.SUBMITTED, + TaskAttemptStateInternal.KILLED, + TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM, + new ContainerCompletedWhileRunningTransition(KILLED_HELPER)) + .addTransition( + TaskAttemptStateInternal.SUBMITTED, + EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS, + TaskAttemptStateInternal.SUBMITTED), + TaskAttemptEventType.TA_OUTPUT_FAILED, + new OutputReportedFailedTransition()) + // for recovery, needs to log the TA generated events in TaskAttemptFinishedEvent + .addTransition(TaskAttemptStateInternal.SUBMITTED, + TaskAttemptStateInternal.SUBMITTED, + TaskAttemptEventType.TA_TEZ_EVENT_UPDATE, + new TezEventUpdaterTransition()) + + + .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER) @@ -852,6 +911,7 @@ public class TaskAttemptImpl implements TaskAttempt, switch (smState) { case NEW: case START_WAIT: + case SUBMITTED: return TaskAttemptState.STARTING; case RUNNING: return TaskAttemptState.RUNNING; @@ -1350,13 +1410,14 @@ public class TaskAttemptImpl implements TaskAttempt, } } - protected static class StartedTransition implements + protected static class SubmittedTransition implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { + @Override public void transition(TaskAttemptImpl ta, TaskAttemptEvent origEvent) { - TaskAttemptEventStartedRemotely event = (TaskAttemptEventStartedRemotely) origEvent; + TaskAttemptEventSubmitted event = (TaskAttemptEventSubmitted) origEvent; - AMContainer amContainer = ta.appContext.getAllContainers().get(event.getContainerId()); + AMContainer amContainer = ta.appContext.getAllContainers().get(event.getContainerId()); Container container = amContainer.getContainer(); ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime(); @@ -1376,11 +1437,12 @@ public class TaskAttemptImpl implements TaskAttempt, ta.httpPort = nodeHttpInetAddr.getPort(); ta.sendEvent(createDAGCounterUpdateEventTALaunched(ta)); - LOG.info("TaskAttempt: [" + ta.attemptId + "] started." + LOG.info("TaskAttempt: [" + ta.attemptId + "] submitted." + " Is using containerId: [" + ta.containerId + "]" + " on NM: [" + ta.containerNodeId + "]"); - // JobHistoryEvent + // JobHistoryEvent. + // The started event represents when the attempt was submitted to the executor. ta.logJobHistoryAttemptStarted(); // TODO Remove after HDFS-5098 @@ -1398,15 +1460,31 @@ public class TaskAttemptImpl implements TaskAttempt, // Inform the Task ta.sendEvent(new TaskEventTALaunched(ta.attemptId)); - + if (ta.isSpeculationEnabled()) { ta.sendEvent(new SpeculatorEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.RUNNING, ta.launchTime, true)); } + ta.sendEvent( + new AMSchedulerEventTAStateUpdated(ta, TaskScheduler.SchedulerTaskState.SUBMITTED, + ta.getVertex().getTaskSchedulerIdentifier())); ta.taskHeartbeatHandler.register(ta.attemptId); } } + + protected static class StartedTransition implements + SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { + + @Override + public void transition(TaskAttemptImpl ta, TaskAttemptEvent taskAttemptEvent) { + ta.sendEvent( + new AMSchedulerEventTAStateUpdated(ta, TaskScheduler.SchedulerTaskState.STARTED, + ta.getVertex().getTaskSchedulerIdentifier())); + // Nothing specific required for recovery, since recovery processes the START/END events + // only and moves the attempt to a final state, or an initial state. + } + } private boolean isSpeculationEnabled() { return conf.getBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAStateUpdated.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAStateUpdated.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAStateUpdated.java new file mode 100644 index 0000000..95a56e5 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAStateUpdated.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.tez.dag.app.rm; + +import org.apache.tez.dag.app.dag.TaskAttempt; +import org.apache.tez.serviceplugins.api.TaskScheduler.SchedulerTaskState; + +public class AMSchedulerEventTAStateUpdated extends AMSchedulerEvent { + + private final TaskAttempt taskAttempt; + private final SchedulerTaskState state; + + public AMSchedulerEventTAStateUpdated(TaskAttempt taskAttempt, SchedulerTaskState state, + int schedulerId) { + super(AMSchedulerEventType.S_TA_STATE_UPDATED, schedulerId); + this.taskAttempt = taskAttempt; + this.state = state; + } + + public TaskAttempt getTaskAttempt() { + return taskAttempt; + } + + public SchedulerTaskState getState() { + return state; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java index 053146d..cc52ef6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java @@ -21,6 +21,7 @@ package org.apache.tez.dag.app.rm; public enum AMSchedulerEventType { //Producer: TaskAttempt S_TA_LAUNCH_REQUEST, + S_TA_STATE_UPDATED, S_TA_ENDED, // Annotated with FAILED/KILLED/SUCCEEDED. //Producer: Node http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java index d32261f..57afbfc 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java @@ -259,6 +259,9 @@ public class TaskSchedulerManager extends AbstractService implements case S_TA_LAUNCH_REQUEST: handleTaLaunchRequest((AMSchedulerEventTALaunchRequest) sEvent); break; + case S_TA_STATE_UPDATED: + handleTAStateUpdated((AMSchedulerEventTAStateUpdated) sEvent); + break; case S_TA_ENDED: // TaskAttempt considered complete. AMSchedulerEventTAEnded event = (AMSchedulerEventTAEnded)sEvent; switch(event.getState()) { @@ -521,6 +524,23 @@ public class TaskSchedulerManager extends AbstractService implements } } + private void handleTAStateUpdated(AMSchedulerEventTAStateUpdated event) { + try { + taskSchedulers[event.getSchedulerId()].taskStateUpdated(event.getTaskAttempt(), event.getState()); + } catch (Exception e) { + String msg = "Error in TaskScheduler for handling Task State Update" + + ", eventType=" + event.getType() + + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext) + + ", taskAttemptId=" + event.getTaskAttempt().getID() + + ", state=" + event.getState(); + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, + msg, e)); + } + } + @VisibleForTesting TaskScheduler createTaskScheduler(String host, int port, String trackingUrl, AppContext appContext, http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java index 43cf045..9e0d2ab 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java @@ -63,6 +63,10 @@ public class TaskSchedulerWrapper { real.allocateTask(task, capability, containerId, priority, containerSignature, clientCookie); } + public void taskStateUpdated(Object task, TaskScheduler.SchedulerTaskState state) throws Exception { + real.taskStateUpdated(task, state); + } + public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason, @Nullable String diagnostics) throws Exception { return real.deallocateTask(task, taskSucceeded, endReason, diagnostics); http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java index 4c6e846..b6f3a54 100644 --- a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java +++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java @@ -109,11 +109,31 @@ public interface TaskCommunicatorContext extends ServicePluginContextBase { void containerAlive(ContainerId containerId); /** - * Inform the framework that the task has started execution + * Inform the framework that the task has been submitted for execution. The expectation is that + * the implementing TaskCommunicator will inform the framework about task submission, followed + * by the task starting. * * @param taskAttemptId the relevant task attempt id * @param containerId the containerId in which the task attempt is running */ + void taskSubmitted(TezTaskAttemptID taskAttemptId, ContainerId containerId); + + /** + * Inform the framework that the task has started execution + * + * @param taskAttemptId the relevant task attempt id + */ + void taskStartedRemotely(TezTaskAttemptID taskAttemptId); + + /** + * Inform the framework that the task has started execution + * + * Use {@link #taskSubmitted(TezTaskAttemptID, ContainerId)} + * and {@link #taskStartedRemotely(TezTaskAttemptID)} instead + * + * @param taskAttemptId the relevant task attempt id + */ + @Deprecated void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId); /** http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 44d8213..acf8f23 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -63,6 +63,7 @@ import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.tez.common.MockDNSToSwitchMapping; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted; import org.apache.tez.dag.app.dag.event.TaskEventTAFailed; import org.apache.tez.dag.app.dag.event.TaskEventTAKilled; import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded; @@ -405,14 +406,18 @@ public class TestTaskAttempt { ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); + taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); + assertEquals("Task attempt is not in the STARTING state", taImpl.getState(), + TaskAttemptState.STARTING); + assertEquals("Task attempt internal state is not at SUBMITTED", taImpl.getInternalState(), + TaskAttemptStateInternal.SUBMITTED); // At state STARTING. - taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, - null)); + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), TaskAttemptState.RUNNING); verify(mockHeartbeatHandler).register(taskAttemptID); - int expectedEventsAtRunning = 3; + int expectedEventsAtRunning = 5; verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID, @@ -505,10 +510,9 @@ public class TestTaskAttempt { TezTaskAttemptID taskAttemptID = taImpl.getID(); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); - // At state STARTING. - taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, - null)); - int expectedEventsAtRunning = 3; + taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); + int expectedEventsAtRunning = 5; verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); assertEquals("Task attempt is not in running state", taImpl.getState(), TaskAttemptState.RUNNING); @@ -596,14 +600,13 @@ public class TestTaskAttempt { ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); - // At state STARTING. - taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, - null)); + taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), TaskAttemptState.RUNNING); verify(mockHeartbeatHandler).register(taskAttemptID); - int expectedEventsAtRunning = 3; + int expectedEventsAtRunning = 5; verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE)); @@ -686,9 +689,8 @@ public class TestTaskAttempt { TezTaskAttemptID taskAttemptID = taImpl.getID(); taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); - // At state STARTING. - taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, - null)); + taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), TaskAttemptState.RUNNING); @@ -773,14 +775,13 @@ public class TestTaskAttempt { ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); - // At state STARTING. - taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, - null)); + taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), TaskAttemptState.RUNNING); verify(mockHeartbeatHandler).register(taskAttemptID); - int expectedEventsAtRunning = 4; + int expectedEventsAtRunning = 6; verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); verifyEventType( arg.getAllValues().subList(0, @@ -881,14 +882,13 @@ public class TestTaskAttempt { ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); - // At state STARTING. - taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, - null)); + taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), TaskAttemptState.RUNNING); verify(mockHeartbeatHandler).register(taskAttemptID); - int expectedEventsAtRunning = 4; + int expectedEventsAtRunning = 6; verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); verifyEventType( arg.getAllValues().subList(0, @@ -926,7 +926,6 @@ public class TestTaskAttempt { arg = ArgumentCaptor.forClass(Event.class); verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); - Event e = verifyEventType( arg.getAllValues().subList(expectedEventsAtRunning, expectedEvenstAfterTerminating), TaskEventTAFailed.class, 1); @@ -991,9 +990,8 @@ public class TestTaskAttempt { ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); - // At state STARTING. - taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, - null)); + taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), TaskAttemptState.RUNNING); verify(mockHeartbeatHandler).register(taskAttemptID); @@ -1066,9 +1064,8 @@ public class TestTaskAttempt { ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); - // At state STARTING. - taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, - null)); + taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), TaskAttemptState.RUNNING); verify(mockHeartbeatHandler).register(taskAttemptID); @@ -1136,7 +1133,89 @@ public class TestTaskAttempt { // events from different tasks may not have the same value assertFalse(tEventFail1.getSerializingHash() == tEventFail2.getSerializingHash()); } - + + @Test(timeout = 5000) + public void testCompletedAtSubmitted() throws ServicePluginException { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + appId, 0); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); + TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); + + MockEventHandler eventHandler = spy(new MockEventHandler()); + TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); + + Configuration taskConf = new Configuration(); + taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + taskConf.setBoolean("fs.file.impl.disable.cache", true); + + locationHint = TaskLocationHint.createTaskLocationHint( + new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); + Resource resource = Resource.newInstance(1024, 1); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + @SuppressWarnings("deprecation") + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), + new ContainerContextMatcher(), appCtx); + containers.addContainerIfNew(container, 0, 0, 0); + + doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); + doReturn(containers).when(appCtx).getAllContainers(); + + TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); + TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + taListener, taskConf, new SystemClock(), + mockHeartbeatHandler, appCtx, false, + resource, createFakeContainerContext(), false); + TezTaskAttemptID taskAttemptID = taImpl.getID(); + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + + taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); + taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); + assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), + TaskAttemptState.STARTING); + + verify(mockHeartbeatHandler).register(taskAttemptID); + + int expectedEventsAtStarting = 4; + verify(eventHandler, times(expectedEventsAtStarting)).handle(arg.capture()); + + // Ensure status_updates are handled in the submitted state. + taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, + new TaskStatusUpdateEvent(null, 0.1f, null, false))); + + taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE)); + + assertEquals("Task attempt is not in the SUCCEEDED state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + verify(mockHeartbeatHandler).unregister(taskAttemptID); + assertEquals(0, taImpl.getDiagnostics().size()); + + int expectedEvenstAfterTerminating = expectedEventsAtStarting + 3; + arg = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); + + + Event e = verifyEventType( + arg.getAllValues().subList(expectedEventsAtStarting, + expectedEvenstAfterTerminating), TaskEventTASucceeded.class, 1); + assertEquals(TaskEventType.T_ATTEMPT_SUCCEEDED, e.getType()); + verifyEventType( + arg.getAllValues().subList(expectedEventsAtStarting, + expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1); + verifyEventType( + arg.getAllValues().subList(expectedEventsAtStarting, + expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1); + } + @Test(timeout = 5000) public void testSuccess() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); @@ -1183,14 +1262,13 @@ public class TestTaskAttempt { ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); - // At state STARTING. - taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, - null)); + taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), TaskAttemptState.RUNNING); verify(mockHeartbeatHandler).register(taskAttemptID); - int expectedEventsAtRunning = 4; + int expectedEventsAtRunning = 6; verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); verifyEventType( arg.getAllValues().subList(0, @@ -1210,7 +1288,6 @@ public class TestTaskAttempt { arg = ArgumentCaptor.forClass(Event.class); verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture()); - Event e = verifyEventType( arg.getAllValues().subList(expectedEventsAtRunning, expectedEvenstAfterTerminating), TaskEventTASucceeded.class, 1); @@ -1274,14 +1351,13 @@ public class TestTaskAttempt { ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); - // At state STARTING. - taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, - null)); + taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), TaskAttemptState.RUNNING); verify(mockHeartbeatHandler).register(taskAttemptID); - int expectedEventsAtRunning = 3; + int expectedEventsAtRunning = 5; verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE)); @@ -1367,14 +1443,13 @@ public class TestTaskAttempt { ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); - // At state STARTING. - taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, - null)); + taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); assertEquals("Task attempt is not in the RUNNING state", TaskAttemptState.RUNNING, taImpl.getState()); verify(mockHeartbeatHandler).register(taskAttemptID); - int expectedEventsAtRunning = 3; + int expectedEventsAtRunning = 5; verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE)); @@ -1475,14 +1550,13 @@ public class TestTaskAttempt { ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); - // At state STARTING. - taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, - null)); + taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); assertEquals("Task attempt is not in the RUNNING state", TaskAttemptState.RUNNING, taImpl.getState()); verify(mockHeartbeatHandler).register(taskAttemptID); - int expectedEventsAtRunning = 3; + int expectedEventsAtRunning = 5; verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture()); taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE)); @@ -1573,9 +1647,8 @@ public class TestTaskAttempt { TezTaskAttemptID taskAttemptID = taImpl.getID(); taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); - // At state STARTING. - taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, - null)); + taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); verify(mockHeartbeatHandler).register(taskAttemptID); taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE)); @@ -1583,7 +1656,7 @@ public class TestTaskAttempt { TaskAttemptState.SUCCEEDED); verify(mockHeartbeatHandler).unregister(taskAttemptID); - int expectedEventsTillSucceeded = 6; + int expectedEventsTillSucceeded = 8; ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); ArgumentCaptor<DAGHistoryEvent> histArg = ArgumentCaptor.forClass(DAGHistoryEvent.class); verify(eventHandler, times(expectedEventsTillSucceeded)).handle(arg.capture()); @@ -1657,8 +1730,8 @@ public class TestTaskAttempt { TezTaskAttemptID taskAttemptID2 = taImpl2.getID(); taImpl2.handle(new TaskAttemptEventSchedule(taskAttemptID2, 0, 0)); - // At state STARTING. - taImpl2.handle(new TaskAttemptEventStartedRemotely(taskAttemptID2, contId, null)); + taImpl2.handle(new TaskAttemptEventSubmitted(taskAttemptID2, contId)); + taImpl2.handle(new TaskAttemptEventStartedRemotely(taskAttemptID2)); verify(mockHeartbeatHandler).register(taskAttemptID2); taImpl2.handle(new TaskAttemptEvent(taskAttemptID2, TaskAttemptEventType.TA_DONE)); assertEquals("Task attempt is not in succeeded state", taImpl2.getState(), @@ -1691,8 +1764,8 @@ public class TestTaskAttempt { TezTaskAttemptID taskAttemptID3 = taImpl3.getID(); taImpl3.handle(new TaskAttemptEventSchedule(taskAttemptID3, 0, 0)); - // At state STARTING. - taImpl3.handle(new TaskAttemptEventStartedRemotely(taskAttemptID3, contId, null)); + taImpl3.handle(new TaskAttemptEventSubmitted(taskAttemptID3, contId)); + taImpl3.handle(new TaskAttemptEventStartedRemotely(taskAttemptID3)); verify(mockHeartbeatHandler).register(taskAttemptID3); taImpl3.handle(new TaskAttemptEvent(taskAttemptID3, TaskAttemptEventType.TA_DONE)); assertEquals("Task attempt is not in succeeded state", taImpl3.getState(), http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 76ccf91..b3dd60a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -52,8 +52,6 @@ import com.google.protobuf.ByteString; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.LocalResourceType; -import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.tez.common.DrainDispatcher; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.counters.Limits; @@ -62,7 +60,7 @@ import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource; import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceType; import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceVisibility; -import org.apache.tez.dag.app.ContainerContext; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted; import org.apache.tez.dag.app.dag.event.TaskEventTAFailed; import org.apache.tez.dag.app.dag.event.TaskEventTALaunched; import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded; @@ -3653,7 +3651,8 @@ public class TestVertexImpl { containers.addContainerIfNew(container, 0, 0, 0); doReturn(containers).when(appContext).getAllContainers(); - ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null)); + ta.handle(new TaskAttemptEventSubmitted(ta.getID(), contId)); + ta.handle(new TaskAttemptEventStartedRemotely(ta.getID())); Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState()); ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, @@ -3687,7 +3686,8 @@ public class TestVertexImpl { containers.addContainerIfNew(container, 0, 0, 0); doReturn(containers).when(appContext).getAllContainers(); - ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null)); + ta.handle(new TaskAttemptEventSubmitted(ta.getID(), contId)); + ta.handle(new TaskAttemptEventStartedRemotely(ta.getID())); Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState()); ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, @@ -3723,7 +3723,8 @@ public class TestVertexImpl { containers.addContainerIfNew(container, 0, 0, 0); doReturn(containers).when(appContext).getAllContainers(); - ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null)); + ta.handle(new TaskAttemptEventSubmitted(ta.getID(), contId)); + ta.handle(new TaskAttemptEventStartedRemotely(ta.getID())); Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState()); ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, @@ -7103,14 +7104,16 @@ public class TestVertexImpl { Assert.assertEquals(v.getLastTaskFinishTime(), -1); taskAttempt0.handle(new TaskAttemptEventSchedule(taskAttemptId0, 0, 0)); - taskAttempt0.handle(new TaskAttemptEventStartedRemotely(taskAttemptId0, contId, null)); + taskAttempt0.handle(new TaskAttemptEventSubmitted(taskAttemptId0, contId)); + taskAttempt0.handle(new TaskAttemptEventStartedRemotely(taskAttemptId0)); taskAttempt0.handle(new TaskAttemptEvent(taskAttemptId0, TaskAttemptEventType.TA_DONE)); //task0.handle(new TaskEventTAUpdate(taskAttemptId0, TaskEventType.T_ATTEMPT_SUCCEEDED)); Assert.assertEquals(v.getLastTaskFinishTime(), -1); taskAttempt1.handle(new TaskAttemptEventSchedule(taskAttemptId1, 0, 0)); - taskAttempt1.handle(new TaskAttemptEventStartedRemotely(taskAttemptId1, contId, null)); + taskAttempt1.handle(new TaskAttemptEventSubmitted(taskAttemptId1, contId)); + taskAttempt1.handle(new TaskAttemptEventStartedRemotely(taskAttemptId1)); taskAttempt1.handle(new TaskAttemptEvent(taskAttemptId1, TaskAttemptEventType.TA_DONE)); //task1.handle(new TaskEventTAUpdate(taskAttemptId1, TaskEventType.T_ATTEMPT_SUCCEEDED)); http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java index 6c07107..732c81a 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java @@ -132,8 +132,9 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl } // Have to register this up front right now. Otherwise, it's possible for the task to start // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them. - getContext() - .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId); + + getContext().taskSubmitted(taskSpec.getTaskAttemptID(), containerId); + getContext().taskStartedRemotely(taskSpec.getTaskAttemptID()); communicator.submitWork(requestProto, host, port, new TezTestServiceCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() { @Override
