TEZ-2669. Propagation of errors from plugins to the AM for error reporting. Contributed by Hitesh Shah and Siddharth Seth.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1d765431 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1d765431 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1d765431 Branch: refs/heads/TEZ-2980 Commit: 1d765431601fb8ab7cca248baa973684d828afaa Parents: 0c08577 Author: Siddharth Seth <[email protected]> Authored: Tue Jan 12 15:19:22 2016 -0800 Committer: Siddharth Seth <[email protected]> Committed: Tue Jan 12 15:19:22 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../serviceplugins/api/ContainerLauncher.java | 12 +- .../api/ContainerLauncherContext.java | 3 +- .../api/ServicePluginException.java | 36 +++ .../tez/serviceplugins/api/TaskScheduler.java | 48 +++- tez-dag/src/main/java/org/apache/tez/Utils.java | 66 +++++ .../apache/tez/dag/api/TaskCommunicator.java | 39 ++- .../dag/app/ContainerLauncherContextImpl.java | 18 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 68 +++-- .../dag/app/TaskCommunicatorContextImpl.java | 6 +- .../tez/dag/app/TaskCommunicatorManager.java | 151 +++++++++-- .../app/TaskCommunicatorManagerInterface.java | 3 +- .../tez/dag/app/TaskCommunicatorWrapper.java | 83 ++++++ .../tez/dag/app/TezTaskCommunicatorImpl.java | 2 +- .../tez/dag/app/dag/StateChangeNotifier.java | 7 +- .../app/dag/event/DAGAppMasterEventType.java | 3 + .../DAGAppMasterEventUserServiceFatalError.java | 46 ++++ .../app/dag/event/DAGEventInternalError.java | 32 +++ .../apache/tez/dag/app/dag/impl/DAGImpl.java | 22 +- .../tez/dag/app/dag/impl/VertexManager.java | 8 +- .../app/launcher/ContainerLauncherManager.java | 49 +++- .../app/launcher/ContainerLauncherWrapper.java | 40 +++ .../app/launcher/LocalContainerLauncher.java | 2 +- .../tez/dag/app/rm/TaskSchedulerManager.java | 271 ++++++++++++++++--- .../tez/dag/app/rm/TaskSchedulerWrapper.java | 90 ++++++ .../dag/app/rm/container/AMContainerImpl.java | 30 +- .../apache/tez/dag/app/MockDAGAppMaster.java | 2 +- .../tez/dag/app/PluginWrapperTestHelpers.java | 149 ++++++++++ .../dag/app/TestTaskCommunicatorManager.java | 99 ++++++- .../dag/app/TestTaskCommunicatorManager1.java | 6 +- .../dag/app/TestTaskCommunicatorWrapper.java | 43 +++ .../tez/dag/app/dag/impl/TestTaskAttempt.java | 9 +- .../launcher/TestContainerLauncherManager.java | 83 +++++- .../launcher/TestContainerLauncherWrapper.java | 30 ++ .../tez/dag/app/rm/TestContainerReuse.java | 8 +- .../dag/app/rm/TestTaskSchedulerHelpers.java | 14 +- .../dag/app/rm/TestTaskSchedulerManager.java | 106 +++++++- .../dag/app/rm/TestTaskSchedulerWrapper.java | 29 ++ .../dag/app/rm/container/TestAMContainer.java | 10 +- .../app/rm/container/TestAMContainerMap.java | 3 +- .../org/apache/tez/examples/JoinValidate.java | 4 +- ...zTestServiceContainerLauncherWithErrors.java | 37 +++ ...stServiceTaskSchedulerServiceWithErrors.java | 93 +++++++ ...ezTestServiceTaskCommunicatorWithErrors.java | 83 ++++++ .../tez/examples/JoinValidateConfigured.java | 10 + .../tez/tests/ExternalTezServiceTestHelper.java | 194 +++++++++++++ .../tez/tests/TestExternalTezServices.java | 125 +-------- .../tests/TestExternalTezServicesErrors.java | 235 ++++++++++++++++ 48 files changed, 2231 insertions(+), 277 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6cdc037..d39cbb8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES: + TEZ-2669. Propagation of errors from plugins to the AM for error reporting. TEZ-2978. Add an option to allow the SplitGrouper to generate node local only groups. TEZ-2129. Task and Attempt views should contain links to the logs TEZ-3025. InputInitializer creation should use the dag ugi. http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java index 5a77b69..8792fd7 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java @@ -74,16 +74,22 @@ public abstract class ContainerLauncher implements ServicePluginLifecycle { } /** - * A request to launch the specified container + * Get the {@link ContainerLauncherContext} associated with this instance of the container + * launcher, which is used to communicate with the rest of the system * * @param launchRequest the actual launch request + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ - public abstract void launchContainer(ContainerLaunchRequest launchRequest); + public abstract void launchContainer(ContainerLaunchRequest launchRequest) throws + ServicePluginException; /** * A request to stop a specific container * * @param stopRequest the actual stop request + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ - public abstract void stopContainer(ContainerStopRequest stopRequest); + public abstract void stopContainer(ContainerStopRequest stopRequest) throws ServicePluginException; } http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java index dcd9e80..70a3498 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java @@ -99,12 +99,13 @@ public interface ContainerLauncherContext { ApplicationAttemptId getApplicationAttemptId(); /** - * Get meta info from the specified TaskCommunicator. This assumes that the launched has been + * Get meta info from the specified TaskCommunicator. This assumes that the launcher has been * setup * along with a compatible TaskCommunicator, and the launcher knows how to read this meta-info * * @param taskCommName the name of the task communicator * @return meta info for the requested task communicator + * */ Object getTaskCommunicatorMetaInfo(String taskCommName); } http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginException.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginException.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginException.java new file mode 100644 index 0000000..737329a --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginException.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +/** + * Indicates an error from pluggable Tez Services. + */ +public class ServicePluginException extends Exception { + + public ServicePluginException() { + } + + public ServicePluginException(String message) { + super(message); + } + + public ServicePluginException(String message, Throwable cause) { + super(message, cause); + } + + public ServicePluginException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java index de76029..5875bd2 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java @@ -101,38 +101,48 @@ public abstract class TaskScheduler implements ServicePluginLifecycle { * Get the currently available resources from this source * * @return the resources available at the time of invocation + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ - public abstract Resource getAvailableResources(); + public abstract Resource getAvailableResources() throws ServicePluginException; /** * Get the total available resources from this source * * @return the total available resources from the source + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ - public abstract Resource getTotalResources(); + public abstract Resource getTotalResources() throws ServicePluginException; /** * Get the number of nodes available from the source * * @return the number of nodes + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ - public abstract int getClusterNodeCount(); + public abstract int getClusterNodeCount() throws ServicePluginException; /** * Indication to a source that a node has been blacklisted, and should not be used for subsequent * allocations. * * @param nodeId te nodeId to be blacklisted + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ - public abstract void blacklistNode(NodeId nodeId); + public abstract void blacklistNode(NodeId nodeId) throws ServicePluginException; /** * Indication to a source that a node has been un-blacklisted, and can be used from subsequent * allocations * * @param nodeId the nodeId to be unblacklisted + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ - public abstract void unblacklistNode(NodeId nodeId); + public abstract void unblacklistNode(NodeId nodeId) throws ServicePluginException; /** * A request to the source to allocate resources for a requesting task, with location information @@ -150,10 +160,12 @@ public abstract class TaskScheduler implements ServicePluginLifecycle { * @param clientCookie a cookie associated with this request. This should be returned back * via the {@link TaskSchedulerContext#taskAllocated(Object, Object, * Container)} method when a task is assigned to a resource + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ public abstract void allocateTask(Object task, Resource capability, String[] hosts, String[] racks, Priority priority, - Object containerSignature, Object clientCookie); + Object containerSignature, Object clientCookie) throws ServicePluginException; /** * A request to the source to allocate resources for a requesting task, based on a previously used @@ -171,11 +183,13 @@ public abstract class TaskScheduler implements ServicePluginLifecycle { * @param clientCookie a cookie associated with this request. This should be returned back * via the {@link TaskSchedulerContext#taskAllocated(Object, Object, * Container)} method when a task is assigned to a resource + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ public abstract void allocateTask(Object task, Resource capability, ContainerId containerId, Priority priority, Object containerSignature, - Object clientCookie); + Object clientCookie) throws ServicePluginException; /** * A request to deallocate a task. This is typically a result of a task completing - with success @@ -190,38 +204,48 @@ public abstract class TaskScheduler implements ServicePluginLifecycle { * @param endReason the reason for the task failure * @param diagnostics additional diagnostics information which may be relevant * @return true if the task was associated with a container, false if the task was not associated + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. * with a container */ public abstract boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason, - @Nullable String diagnostics); + @Nullable String diagnostics) throws ServicePluginException; /** * A request to de-allocate a previously allocated container. * * @param containerId the containerId to de-allocate * @return the task which was previously associated with this container, null otherwise + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ - public abstract Object deallocateContainer(ContainerId containerId); + public abstract Object deallocateContainer(ContainerId containerId) throws ServicePluginException; /** * Inform the scheduler that it should unregister. This is primarily valid for schedulers which * require registration (YARN a.t.m) + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ - public abstract void setShouldUnregister(); + public abstract void setShouldUnregister() throws ServicePluginException; /** * Checks with the scheduler whether it has unregistered. * * @return true if the scheduler has unregistered. False otherwise. + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ - public abstract boolean hasUnregistered(); + public abstract boolean hasUnregistered() throws ServicePluginException; /** * Indicates to the scheduler that the currently running dag has completed. * This can be used to reset dag specific statistics, potentially release resources and prepare * for a new DAG. + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ - public abstract void dagComplete(); + public abstract void dagComplete() throws ServicePluginException; } http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/Utils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/Utils.java b/tez-dag/src/main/java/org/apache/tez/Utils.java new file mode 100644 index 0000000..959b536 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/Utils.java @@ -0,0 +1,66 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.tez.dag.app.AppContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + [email protected] +/** + * Utility class within the tez-dag module + */ +public class Utils { + + private static final Logger LOG = LoggerFactory.getLogger(Utils.class); + + public static String getContainerLauncherIdentifierString(int launcherIndex, AppContext appContext) { + String name; + try { + name = appContext.getContainerLauncherName(launcherIndex); + } catch (Exception e) { + LOG.error("Unable to get launcher name for index: " + launcherIndex + + ", falling back to reporting the index"); + return "[" + String.valueOf(launcherIndex) + "]"; + } + return "[" + launcherIndex + ":" + name + "]"; + } + + public static String getTaskCommIdentifierString(int taskCommIndex, AppContext appContext) { + String name; + try { + name = appContext.getTaskCommunicatorName(taskCommIndex); + } catch (Exception e) { + LOG.error("Unable to get taskcomm name for index: " + taskCommIndex + + ", falling back to reporting the index"); + return "[" + String.valueOf(taskCommIndex) + "]"; + } + return "[" + taskCommIndex + ":" + name + "]"; + } + + public static String getTaskSchedulerIdentifierString(int schedulerIndex, AppContext appContext) { + String name; + try { + name = appContext.getTaskSchedulerName(schedulerIndex); + } catch (Exception e) { + LOG.error("Unable to get scheduler name for index: " + schedulerIndex + + ", falling back to reporting the index"); + return "[" + String.valueOf(schedulerIndex) + "]"; + } + return "[" + schedulerIndex + ":" + name + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java index 38742de..1b6ad07 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.common.ServicePluginLifecycle; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.ServicePluginException; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.TaskSpec; @@ -107,8 +108,11 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle { * @param containerId the associated containerId * @param hostname the hostname on which the container runs * @param port the port for the service which is running the container + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ - public abstract void registerRunningContainer(ContainerId containerId, String hostname, int port); + public abstract void registerRunningContainer(ContainerId containerId, String hostname, + int port) throws ServicePluginException; /** * Register the end of a container. This can be caused by preemption, the container completing @@ -117,9 +121,12 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle { * @param containerId the associated containerId * @param endReason the end reason for the container completing * @param diagnostics diagnostics associated with the container end + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, - @Nullable String diagnostics); + @Nullable String diagnostics) throws + ServicePluginException; /** * Register a task attempt to execute on a container @@ -133,11 +140,14 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle { * @param credentialsChanged whether the credentials are different from the original credentials * associated with this container * @param priority the priority of the task being executed + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ public abstract void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec, Map<String, LocalResource> additionalResources, Credentials credentials, - boolean credentialsChanged, int priority); + boolean credentialsChanged, int priority) throws + ServicePluginException; /** * Register the completion of a task. This may be a result of preemption, the container dying, @@ -146,17 +156,22 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle { * @param taskAttemptID the task attempt which has completed / needs to be completed * @param endReason the endReason for the task attempt. * @param diagnostics diagnostics associated with the task end + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason, - @Nullable String diagnostics); + @Nullable String diagnostics) throws + ServicePluginException; /** * Return the address, if any, that the service listens on * * @return the address + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ - public abstract InetSocketAddress getAddress(); + public abstract InetSocketAddress getAddress() throws ServicePluginException; /** * Receive notifications on vertex state changes. @@ -175,9 +190,10 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle { * @param stateUpdate an event indicating the name of the vertex, and it's updated state. * Additional information may be available for specific events, Look at the * type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate} - * @throws Exception + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ - public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception; + public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws ServicePluginException; /** * Indicates the current running dag is complete. The TaskCommunicatorContext can be used to @@ -187,9 +203,10 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle { * the next dag being submitted. * * @param dagIdentifier the unique numerical identifier for the DAG in the specified execution context. - * + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ - public abstract void dagComplete(int dagIdentifier); + public abstract void dagComplete(int dagIdentifier) throws ServicePluginException; /** * Share meta-information such as host:port information where the Task Communicator may be @@ -197,6 +214,8 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle { * Primarily for use by compatible launchers to learn this information. * * @return meta info for the task communicator + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. */ - public abstract Object getMetaInfo(); + public abstract Object getMetaInfo() throws ServicePluginException; } http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java index a2e0dd6..9434256 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java @@ -19,6 +19,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.app.rm.container.AMContainerEvent; @@ -29,10 +31,13 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventStopFailed; import org.apache.tez.dag.app.rm.container.AMContainerEventType; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.events.ContainerLaunchedEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @SuppressWarnings("unchecked") public class ContainerLauncherContextImpl implements ContainerLauncherContext { + private static final Logger LOG = LoggerFactory.getLogger(ContainerLauncherContextImpl.class); private final AppContext context; private final TaskCommunicatorManagerInterface tal; private final UserPayload initialUserPayload; @@ -101,7 +106,18 @@ public class ContainerLauncherContextImpl implements ContainerLauncherContext { @Override public Object getTaskCommunicatorMetaInfo(String taskCommName) { int taskCommId = context.getTaskCommunicatorIdentifier(taskCommName); - return tal.getTaskCommunicator(taskCommId).getMetaInfo(); + try { + return tal.getTaskCommunicator(taskCommId).getMetaInfo(); + } catch (Exception e) { + String msg = "Error in retrieving meta-info from TaskCommunicator" + + ", communicatorName=" + context.getTaskCommunicatorName(taskCommId); + LOG.error(msg, e); + context.getEventHandler().handle( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, + msg, e)); + } + return null; } } http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index c0b86a5..609a018 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -63,6 +63,7 @@ import com.google.common.collect.Lists; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.tez.client.CallerContext; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; @@ -72,6 +73,8 @@ import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDagCleanup; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; +import org.apache.tez.dag.app.dag.event.DAGEventInternalError; import org.apache.tez.dag.history.events.DAGRecoveredEvent; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; @@ -151,10 +154,6 @@ import org.apache.tez.dag.app.dag.event.VertexEvent; import org.apache.tez.dag.app.dag.event.VertexEventType; import org.apache.tez.dag.app.dag.impl.DAGImpl; import org.apache.tez.dag.app.launcher.ContainerLauncherManager; -import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl; -import org.apache.tez.dag.app.dag.impl.TaskImpl; -import org.apache.tez.dag.app.dag.impl.VertexImpl; -import org.apache.tez.dag.app.launcher.LocalContainerLauncher; import org.apache.tez.dag.app.rm.AMSchedulerEventType; import org.apache.tez.dag.app.rm.ContainerLauncherEventType; import org.apache.tez.dag.app.rm.TaskSchedulerManager; @@ -671,8 +670,34 @@ public class DAGAppMaster extends AbstractService { return taskSchedulerManager; } + private void handleInternalError(String errDiagnosticsPrefix, String errDiagDagEvent) { + state = DAGAppMasterState.ERROR; + if (currentDAG != null) { + _updateLoggers(currentDAG, "_post"); + String errDiagnostics = errDiagnosticsPrefix + ". Aborting dag: " + currentDAG.getID(); + LOG.info(errDiagnostics); + // Inform the current DAG about the error + sendEvent(new DAGEventInternalError(currentDAG.getID(), errDiagDagEvent)); + } else { + LOG.info(errDiagnosticsPrefix + ". AppMaster will exit as no dag is active"); + // This could be problematic if the scheduler generated the error, + // since un-registration may not be possible. + // For now - try setting this flag, but call the shutdownHandler irrespective of + // how the flag is handled by user code. + try { + this.taskSchedulerManager.setShouldUnregisterFlag(); + } catch (Exception e) { + // Ignore exception for now + LOG.error("Error when trying to set unregister flag for TaskScheduler", e); + } finally { + shutdownHandler.shutdown(); + } + } + } + @VisibleForTesting protected synchronized void handle(DAGAppMasterEvent event) { + String errDiagnostics; switch (event.getType()) { case SCHEDULING_SERVICE_ERROR: // Scheduling error - probably an issue with the communication with the RM @@ -683,22 +708,30 @@ public class DAGAppMaster extends AbstractService { DAGAppMasterEventSchedulingServiceError schedulingServiceErrorEvent = (DAGAppMasterEventSchedulingServiceError) event; state = DAGAppMasterState.ERROR; - LOG.info("Error in the TaskScheduler. Shutting down.", - schedulingServiceErrorEvent.getThrowable()); + errDiagnostics = "Error in the TaskScheduler. Shutting down. "; + addDiagnostic(errDiagnostics + + "Error=" + ExceptionUtils.getStackTrace(schedulingServiceErrorEvent.getThrowable())); + LOG.error(errDiagnostics, schedulingServiceErrorEvent.getThrowable()); shutdownHandler.shutdown(); break; + case TASK_COMMUNICATOR_SERVICE_FATAL_ERROR: + case CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR: + case TASK_SCHEDULER_SERVICE_FATAL_ERROR: + // A fatal error from the pluggable services. The AM cannot continue operation, and should + // be shutdown. The AM should not be restarted for recovery. + DAGAppMasterEventUserServiceFatalError usfe = (DAGAppMasterEventUserServiceFatalError) event; + Throwable error = usfe.getError(); + errDiagnostics = "Service Error: " + usfe.getDiagnosticInfo() + + ", eventType=" + event.getType() + + ", exception=" + ExceptionUtils.getStackTrace(usfe.getError()); + LOG.error(errDiagnostics, error); + addDiagnostic(errDiagnostics); + + handleInternalError("Service error: " + event.getType(), errDiagnostics); + break; case INTERNAL_ERROR: - state = DAGAppMasterState.ERROR; - if(currentDAG != null) { - _updateLoggers(currentDAG, "_post"); - // notify dag to finish which will send the DAG_FINISHED event - LOG.info("Internal Error. Notifying dags to finish."); - sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.INTERNAL_ERROR)); - } else { - LOG.info("Internal Error. Finishing directly as no dag is active."); - this.taskSchedulerManager.setShouldUnregisterFlag(); - shutdownHandler.shutdown(); - } + handleInternalError("DAGAppMaster Internal Error occurred", + "DAGAppMaster Internal Error occurred"); break; case DAG_FINISHED: DAGAppMasterEventDAGFinished finishEvt = @@ -756,6 +789,7 @@ public class DAGAppMaster extends AbstractService { LOG.error("Received a DAG Finished Event with state=" + finishEvt.getDAGState() + ". Error. Shutting down."); + addDiagnostic("DAG completed with an ERROR state. Shutting down AM"); state = DAGAppMasterState.ERROR; this.taskSchedulerManager.setShouldUnregisterFlag(); shutdownHandler.shutdown(); http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java index 6ae6dad..2b7234c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java @@ -205,11 +205,7 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver @Override public void onStateUpdated(VertexStateUpdate event) { - try { - taskCommunicatorManager.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex); - } catch (Exception e) { - throw new TezUncheckedException(e); - } + taskCommunicatorManager.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex); } private DAG getDag() { http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java index 92bf3c4..64a964b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java @@ -30,9 +30,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.commons.collections4.ListUtils; +import org.apache.hadoop.yarn.event.Event; +import org.apache.tez.Utils; import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; @@ -48,7 +53,6 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezUtilsInternal; -import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskHeartbeatResponse; @@ -81,7 +85,7 @@ public class TaskCommunicatorManager extends AbstractService implements .getLogger(TaskCommunicatorManager.class); private final AppContext context; - private final TaskCommunicator[] taskCommunicators; + private final TaskCommunicatorWrapper[] taskCommunicators; private final TaskCommunicatorContext[] taskCommunicatorContexts; protected final ServicePluginLifecycleAbstractService []taskCommunicatorServiceWrappers; @@ -106,6 +110,24 @@ public class TaskCommunicatorManager extends AbstractService implements private static final ContainerInfo NULL_CONTAINER_INFO = new ContainerInfo(null); + @VisibleForTesting + @InterfaceAudience.Private + /** + * Only for testing. + */ + public TaskCommunicatorManager(TaskCommunicator taskCommunicator, AppContext appContext, + TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) { + super(TaskCommunicatorManager.class.getName()); + this.context = appContext; + this.taskHeartbeatHandler = thh; + this.containerHeartbeatHandler = chh; + taskCommunicators = + new TaskCommunicatorWrapper[]{new TaskCommunicatorWrapper(taskCommunicator)}; + taskCommunicatorContexts = new TaskCommunicatorContext[]{taskCommunicator.getContext()}; + taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[]{ + new ServicePluginLifecycleAbstractService(taskCommunicator)}; + } + public TaskCommunicatorManager(AppContext context, TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, List<NamedEntityDescriptor> taskCommunicatorDescriptors) throws TezException { @@ -116,14 +138,15 @@ public class TaskCommunicatorManager extends AbstractService implements Preconditions.checkArgument( taskCommunicatorDescriptors != null && !taskCommunicatorDescriptors.isEmpty(), "TaskCommunicators must be specified"); - this.taskCommunicators = new TaskCommunicator[taskCommunicatorDescriptors.size()]; + this.taskCommunicators = new TaskCommunicatorWrapper[taskCommunicatorDescriptors.size()]; this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorDescriptors.size()]; this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorDescriptors.size()]; for (int i = 0 ; i < taskCommunicatorDescriptors.size() ; i++) { UserPayload userPayload = taskCommunicatorDescriptors.get(i).getUserPayload(); taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, userPayload, i); - taskCommunicators[i] = createTaskCommunicator(taskCommunicatorDescriptors.get(i), i); - taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskCommunicators[i]); + taskCommunicators[i] = new TaskCommunicatorWrapper(createTaskCommunicator(taskCommunicatorDescriptors.get(i), i)); + taskCommunicatorServiceWrappers[i] = + new ServicePluginLifecycleAbstractService(taskCommunicators[i].getTaskCommunicator()); } // TODO TEZ-2118 Start using taskCommunicator indices properly } @@ -269,11 +292,11 @@ public class TaskCommunicatorManager extends AbstractService implements } if (taskAttemptEvent != null) { taskAttemptEvent.setReadErrorReported(readErrorReported); - context.getEventHandler().handle(taskAttemptEvent); + sendEvent(taskAttemptEvent); } // route taGeneratedEvents to TaskAttempt if (!taGeneratedEvents.isEmpty()) { - context.getEventHandler().handle(new TaskAttemptEventTezEventUpdate(taskAttemptID, taGeneratedEvents)); + sendEvent(new TaskAttemptEventTezEventUpdate(taskAttemptID, taGeneratedEvents)); } // route events to TaskAttempt Preconditions.checkArgument(taFinishedEvents.size() <= 1, "Multiple TaskAttemptFinishedEvent"); @@ -300,14 +323,14 @@ public class TaskCommunicatorManager extends AbstractService implements sourceMeta.getEventGenerator()); } TaskAttemptFailedEvent taskFailedEvent =(TaskAttemptFailedEvent) e.getEvent(); - context.getEventHandler().handle( + sendEvent( new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_FAILED, "Error: " + taskFailedEvent.getDiagnostics(), errCause)); break; case TASK_ATTEMPT_COMPLETED_EVENT: - context.getEventHandler().handle( + sendEvent( new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_DONE)); break; default: @@ -317,7 +340,7 @@ public class TaskCommunicatorManager extends AbstractService implements } if (!eventsForVertex.isEmpty()) { TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID(); - context.getEventHandler().handle( + sendEvent( new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(eventsForVertex))); } taskHeartbeatHandler.pinged(taskAttemptID); @@ -339,8 +362,7 @@ public class TaskCommunicatorManager extends AbstractService implements } public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) { - context.getEventHandler() - .handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null)); + sendEvent(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null)); pingContainerHeartbeatHandler(containerId); } @@ -351,7 +373,7 @@ public class TaskCommunicatorManager extends AbstractService implements // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore, // instead of waiting for the unregister to flow through the Container. // Fix along the same lines as TEZ-2124 by introducing an explict context. - context.getEventHandler().handle(new TaskAttemptEventAttemptKilled(taskAttemptId, + sendEvent(new TaskAttemptEventAttemptKilled(taskAttemptId, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason( taskAttemptEndReason))); } @@ -363,14 +385,25 @@ public class TaskCommunicatorManager extends AbstractService implements // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore, // instead of waiting for the unregister to flow through the Container. // Fix along the same lines as TEZ-2124 by introducing an explict context. - context.getEventHandler().handle(new TaskAttemptEventAttemptFailed(taskAttemptId, + sendEvent(new TaskAttemptEventAttemptFailed(taskAttemptId, TaskAttemptEventType.TA_FAILED, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason( taskAttemptEndReason))); } - public void vertexStateUpdateNotificationReceived(VertexStateUpdate event, int taskCommIndex) throws - Exception { - taskCommunicators[taskCommIndex].onVertexStateUpdated(event); + public void vertexStateUpdateNotificationReceived(VertexStateUpdate event, int taskCommIndex) { + try { + taskCommunicators[taskCommIndex].onVertexStateUpdated(event); + } catch (Exception e) { + String msg = "Error in TaskCommunicator when handling vertex state update notification" + + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommIndex, context) + + ", vertexName=" + event.getVertexName() + + ", vertexState=" + event.getVertexState(); + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, + msg, e)); + } } @@ -410,9 +443,19 @@ public class TaskCommunicatorManager extends AbstractService implements // Inform all communicators of the dagCompletion. for (int i = 0 ; i < taskCommunicators.length ; i++) { - ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteStart(dag); - taskCommunicators[i].dagComplete(dag.getID().getId()); - ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteEnd(); + try { + ((TaskCommunicatorContextImpl) taskCommunicatorContexts[i]).dagCompleteStart(dag); + taskCommunicators[i].dagComplete(dag.getID().getId()); + ((TaskCommunicatorContextImpl) taskCommunicatorContexts[i]).dagCompleteEnd(); + } catch (Exception e) { + String msg = "Error in TaskCommunicator when notifying for DAG completion" + + ", communicator=" + Utils.getTaskCommIdentifierString(i, context); + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, + msg, e)); + } } } @@ -434,8 +477,20 @@ public class TaskCommunicatorManager extends AbstractService implements "Multiple registrations for containerId: " + containerId); } NodeId nodeId = context.getAllContainers().get(containerId).getContainer().getNodeId(); - taskCommunicators[taskCommId].registerRunningContainer(containerId, nodeId.getHost(), - nodeId.getPort()); + try { + taskCommunicators[taskCommId].registerRunningContainer(containerId, nodeId.getHost(), + nodeId.getPort()); + } catch (Exception e) { + String msg = "Error in TaskCommunicator when registering running Container" + + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context) + + ", containerId=" + containerId + + ", nodeId=" + nodeId; + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, + msg, e)); + } } @Override @@ -447,7 +502,18 @@ public class TaskCommunicatorManager extends AbstractService implements if (containerInfo.taskAttemptId != null) { registeredAttempts.remove(containerInfo.taskAttemptId); } - taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason, diagnostics); + try { + taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason, diagnostics); + } catch (Exception e) { + String msg = "Error in TaskCommunicator when unregistering Container" + + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context) + + ", containerId=" + containerId; + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, + msg, e)); + } } @Override @@ -475,9 +541,21 @@ public class TaskCommunicatorManager extends AbstractService implements + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId + " when already assigned to: " + containerIdFromMap); } - taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(), - amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(), - amContainerTask.haveCredentialsChanged(), amContainerTask.getPriority()); + try { + taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(), + amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(), + amContainerTask.haveCredentialsChanged(), amContainerTask.getPriority()); + } catch (Exception e) { + String msg = "Error in TaskCommunicator when registering Task Attempt" + + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context) + + ", containerId=" + containerId + + ", taskId=" + amContainerTask.getTask().getTaskAttemptID(); + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, + msg, e)); + } } @Override @@ -495,11 +573,23 @@ public class TaskCommunicatorManager extends AbstractService implements } // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map. registeredContainers.put(containerId, NULL_CONTAINER_INFO); - taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason, diagnostics); + try { + taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason, diagnostics); + } catch (Exception e) { + String msg = "Error in TaskCommunicator when unregistering Task Attempt" + + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context) + + ", containerId=" + containerId + + ", taskId=" + attemptId; + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, + msg, e)); + } } @Override - public TaskCommunicator getTaskCommunicator(int taskCommIndex) { + public TaskCommunicatorWrapper getTaskCommunicator(int taskCommIndex) { return taskCommunicators[taskCommIndex]; } @@ -516,4 +606,9 @@ public class TaskCommunicatorManager extends AbstractService implements + ", ContainerId not known for this attempt"); } } + + @SuppressWarnings("unchecked") + private void sendEvent(Event<?> event) { + context.getEventHandler().handle(event); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java index 8d060a2..e07b1a0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java @@ -22,7 +22,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.app.dag.DAG; -import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.app.rm.container.AMContainerTask; import org.apache.tez.dag.records.TezTaskAttemptID; /** @@ -42,5 +41,5 @@ public interface TaskCommunicatorManagerInterface { void dagSubmitted(); - TaskCommunicator getTaskCommunicator(int taskCommIndex); + TaskCommunicatorWrapper getTaskCommunicator(int taskCommIndex); } http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java new file mode 100644 index 0000000..4f9780e --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java @@ -0,0 +1,83 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app; + +import javax.annotation.Nullable; +import java.net.InetSocketAddress; +import java.util.Map; + +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.dag.api.TaskCommunicator; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; + +public class TaskCommunicatorWrapper { + + private final TaskCommunicator real; + + public TaskCommunicatorWrapper(TaskCommunicator real) { + this.real = real; + } + + + public void registerRunningContainer(ContainerId containerId, String hostname, int port) throws + Exception { + real.registerRunningContainer(containerId, hostname, port); + } + + public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, + @Nullable String diagnostics) throws Exception { + real.registerContainerEnd(containerId, endReason, diagnostics); + + } + + public void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec, + Map<String, LocalResource> additionalResources, + Credentials credentials, boolean credentialsChanged, + int priority) throws Exception { + real.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged, priority); + } + + public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, + TaskAttemptEndReason endReason, + @Nullable String diagnostics) throws Exception { + real.unregisterRunningTaskAttempt(taskAttemptID, endReason, diagnostics); + } + + public InetSocketAddress getAddress() throws Exception { + return real.getAddress(); + } + + public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception { + real.onVertexStateUpdated(stateUpdate); + } + + public void dagComplete(int dagIdentifier) throws Exception { + real.dagComplete(dagIdentifier); + } + + public Object getMetaInfo() throws Exception { + return real.getMetaInfo(); + } + + public TaskCommunicator getTaskCommunicator() { + return real; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index 78e95bd..d071e0d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -273,7 +273,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { } @Override - public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception { + public void onVertexStateUpdated(VertexStateUpdate stateUpdate) { // Empty. Not registering, or expecting any updates. } http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java index 990bdea..bd04fd8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java @@ -34,10 +34,12 @@ import com.google.common.collect.ListMultimap; import com.google.common.collect.Multimaps; import com.google.common.collect.SetMultimap; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.app.dag.event.DAGEvent; +import org.apache.tez.dag.app.dag.event.DAGEventInternalError; import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; @@ -110,7 +112,10 @@ public class StateChangeNotifier { } catch (Exception e) { // TODO send user code exception - TEZ-2332 LOG.error("Error in state update notification for " + event, e); - dag.getEventHandler().handle(new DAGEvent(dag.getID(), DAGEventType.INTERNAL_ERROR)); + dag.getEventHandler().handle( + new DAGEventInternalError(dag.getID(), + "Internal Error in State Update Notification: " + + ExceptionUtils.getStackTrace(e))); return; } } http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java index 5a102a5..9cf2414 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java @@ -22,6 +22,9 @@ public enum DAGAppMasterEventType { INTERNAL_ERROR, AM_REBOOT, DAG_FINISHED, + TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, + CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR, + TASK_SCHEDULER_SERVICE_FATAL_ERROR, SCHEDULING_SERVICE_ERROR, NEW_DAG_SUBMITTED, // Indicates a new dag being submitted, to notify sub-components DAG_CLEANUP http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventUserServiceFatalError.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventUserServiceFatalError.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventUserServiceFatalError.java new file mode 100644 index 0000000..7bc3bd8 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventUserServiceFatalError.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.event; + +import java.util.EnumSet; + +import com.google.common.base.Preconditions; + +public class DAGAppMasterEventUserServiceFatalError extends DAGAppMasterEvent implements DiagnosableEvent { + + private final Throwable error; + private final String diagnostics; + + public DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType type, + String diagnostics, Throwable t) { + super(type); + Preconditions.checkArgument( + EnumSet.of(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, + DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR, + DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR).contains(type), + "Event created with incorrect type: " + type); + this.error = t; + this.diagnostics = diagnostics; + } + + public Throwable getError() { + return error; + } + + @Override + public String getDiagnosticInfo() { + return diagnostics; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventInternalError.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventInternalError.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventInternalError.java new file mode 100644 index 0000000..724ecbe --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventInternalError.java @@ -0,0 +1,32 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.event; + +import org.apache.tez.dag.records.TezDAGID; + +public class DAGEventInternalError extends DAGEvent implements DiagnosableEvent { + + private final String diagnostics; + + public DAGEventInternalError(TezDAGID dagId, String diagnostics) { + super(dagId, DAGEventType.INTERNAL_ERROR); + this.diagnostics = diagnostics; + } + + @Override + public String getDiagnosticInfo() { + return diagnostics; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 60f933f..41017ea 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -43,6 +43,8 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.LimitExceededException; +import org.apache.tez.dag.app.dag.event.DAGEventInternalError; +import org.apache.tez.dag.app.dag.event.DiagnosableEvent; import org.apache.tez.state.OnStateChangedCallback; import org.apache.tez.state.StateMachineTez; import org.slf4j.Logger; @@ -2252,13 +2254,21 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private static class InternalErrorTransition implements SingleArcTransition<DAGImpl, DAGEvent> { @Override - public void transition(DAGImpl job, DAGEvent event) { - LOG.info(job.getID() + " terminating due to internal error"); + public void transition(DAGImpl dag, DAGEvent event) { + String diagnostics = null; + if (event instanceof DiagnosableEvent) { + DiagnosableEvent errEvent = (DiagnosableEvent) event; + diagnostics = errEvent.getDiagnosticInfo(); + dag.addDiagnostic(diagnostics); + } + + LOG.info(dag.getID() + " terminating due to internal error. " + + (diagnostics == null? "" : " Error=" + diagnostics)); // terminate all vertices - job.enactKill(DAGTerminationCause.INTERNAL_ERROR, VertexTerminationCause.INTERNAL_ERROR); - job.setFinishTime(); - job.cancelCommits(); - job.finished(DAGState.ERROR); + dag.enactKill(DAGTerminationCause.INTERNAL_ERROR, VertexTerminationCause.INTERNAL_ERROR); + dag.setFinishTime(); + dag.cancelCommits(); + dag.finished(DAGState.ERROR); } } http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java index 379e316..388d3c7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java @@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tez.dag.app.dag.event.DAGEventInternalError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.security.UserGroupInformation; @@ -56,8 +58,6 @@ import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.event.CallableEvent; -import org.apache.tez.dag.app.dag.event.DAGEvent; -import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation; import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError; import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source; @@ -530,7 +530,9 @@ public class VertexManager { // state change must be triggered via an event transition LOG.error("Error after vertex manager callback " + managedVertex.getLogIdentifier(), e); appContext.getEventHandler().handle( - (new DAGEvent(managedVertex.getVertexId().getDAGId(), DAGEventType.INTERNAL_ERROR))); + (new DAGEventInternalError(managedVertex.getVertexId().getDAGId(), + "Error in VertexManager for vertex: " + managedVertex.getLogIdentifier() + + ", error=" + ExceptionUtils.getStackTrace(e)))); } } http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java index 9e56f44..98237c1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java @@ -22,13 +22,17 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tez.Utils; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; @@ -49,7 +53,7 @@ public class ContainerLauncherManager extends AbstractService static final Logger LOG = LoggerFactory.getLogger(TezContainerLauncherImpl.class); @VisibleForTesting - final ContainerLauncher containerLaunchers[]; + final ContainerLauncherWrapper containerLaunchers[]; @VisibleForTesting final ContainerLauncherContext containerLauncherContexts[]; protected final ServicePluginLifecycleAbstractService[] containerLauncherServiceWrappers; @@ -59,7 +63,7 @@ public class ContainerLauncherManager extends AbstractService public ContainerLauncherManager(ContainerLauncher containerLauncher, AppContext context) { super(ContainerLauncherManager.class.getName()); this.appContext = context; - containerLaunchers = new ContainerLauncher[] {containerLauncher}; + containerLaunchers = new ContainerLauncherWrapper[] {new ContainerLauncherWrapper(containerLauncher)}; containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()}; containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[]{ new ServicePluginLifecycleAbstractService<>(containerLauncher)}; @@ -78,7 +82,7 @@ public class ContainerLauncherManager extends AbstractService containerLauncherDescriptors != null && !containerLauncherDescriptors.isEmpty(), "ContainerLauncherDescriptors must be specified"); containerLauncherContexts = new ContainerLauncherContext[containerLauncherDescriptors.size()]; - containerLaunchers = new ContainerLauncher[containerLauncherDescriptors.size()]; + containerLaunchers = new ContainerLauncherWrapper[containerLauncherDescriptors.size()]; containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[containerLauncherDescriptors.size()]; @@ -87,9 +91,9 @@ public class ContainerLauncherManager extends AbstractService ContainerLauncherContext containerLauncherContext = new ContainerLauncherContextImpl(context, taskCommunicatorManagerInterface, userPayload); containerLauncherContexts[i] = containerLauncherContext; - containerLaunchers[i] = createContainerLauncher(containerLauncherDescriptors.get(i), context, - containerLauncherContext, taskCommunicatorManagerInterface, workingDirectory, i, isPureLocalMode); - containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(containerLaunchers[i]); + containerLaunchers[i] = new ContainerLauncherWrapper(createContainerLauncher(containerLauncherDescriptors.get(i), context, + containerLauncherContext, taskCommunicatorManagerInterface, workingDirectory, i, isPureLocalMode)); + containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(containerLaunchers[i].getContainerLauncher()); } } @@ -197,14 +201,43 @@ public class ContainerLauncherManager extends AbstractService launchEvent.getContainerToken(), launchEvent.getContainerLaunchContext(), launchEvent.getContainer(), schedulerName, taskCommName); - containerLaunchers[launcherId].launchContainer(launchRequest); + try { + containerLaunchers[launcherId].launchContainer(launchRequest); + } catch (Exception e) { + String msg = "Error when launching container" + + ", containerLauncher=" + Utils.getContainerLauncherIdentifierString(launcherId, appContext) + + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext) + + ", taskCommunicator=" + Utils.getTaskCommIdentifierString(event.getTaskCommId(), appContext); + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR, + msg, e)); + } break; case CONTAINER_STOP_REQUEST: ContainerStopRequest stopRequest = new ContainerStopRequest(event.getNodeId(), event.getContainerId(), event.getContainerToken(), schedulerName, taskCommName); - containerLaunchers[launcherId].stopContainer(stopRequest); + try { + containerLaunchers[launcherId].stopContainer(stopRequest); + } catch (Exception e) { + String msg = "Error when stopping container" + + ", containerLauncher=" + Utils.getContainerLauncherIdentifierString(launcherId, appContext) + + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext) + + ", taskCommunicator=" + Utils.getTaskCommIdentifierString(event.getTaskCommId(), appContext); + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR, + msg, e)); + } break; } } + + @SuppressWarnings("unchecked") + private void sendEvent(Event<?> event) { + appContext.getEventHandler().handle(event); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java new file mode 100644 index 0000000..08e287e --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java @@ -0,0 +1,40 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.launcher; + +import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; +import org.apache.tez.serviceplugins.api.ContainerLauncher; +import org.apache.tez.serviceplugins.api.ContainerStopRequest; + +public class ContainerLauncherWrapper { + + private final ContainerLauncher real; + + public ContainerLauncherWrapper(ContainerLauncher containerLauncher) { + this.real = containerLauncher; + } + + public void launchContainer(ContainerLaunchRequest launchRequest) throws Exception { + real.launchContainer(launchRequest); + } + + public void stopContainer(ContainerStopRequest stopRequest) throws Exception { + real.stopContainer(stopRequest); + } + + public ContainerLauncher getContainerLauncher() { + return real; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index c4ab6e3..b737fda 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -228,7 +228,7 @@ public class LocalContainerLauncher extends ContainerLauncher { tezChild = createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier, context.getApplicationAttemptId().getAttemptId(), context.getLocalDirs(), - ((TezTaskCommunicatorImpl)tal.getTaskCommunicator(taskCommId)).getUmbilical(), + ((TezTaskCommunicatorImpl)tal.getTaskCommunicator(taskCommId).getTaskCommunicator()).getUmbilical(), TezCommonUtils.parseCredentialsBytes(event.getContainerLaunchContext().getTokens().array())); } catch (InterruptedException e) { handleLaunchFailed(e, event.getContainerId());
