TEZ-2004. Define basic interface for pluggable ContainerLaunchers. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/af1cc723 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/af1cc723 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/af1cc723 Branch: refs/heads/TEZ-2003 Commit: af1cc7236f100bcc7efcf9b48aae24357fa851bd Parents: a5dfca2 Author: Siddharth Seth <[email protected]> Authored: Mon Jul 20 15:52:24 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri Aug 14 13:46:45 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../serviceplugins/api/ContainerEndReason.java | 31 +++++ .../api/ContainerLaunchRequest.java | 81 +++++++++++ .../serviceplugins/api/ContainerLauncher.java | 46 +++++++ .../api/ContainerLauncherContext.java | 54 ++++++++ .../api/ContainerLauncherOperationBase.java | 58 ++++++++ .../api/ContainerStopRequest.java | 47 +++++++ .../api/TaskAttemptEndReason.java | 32 +++++ .../org/apache/tez/common/TezUtilsInternal.java | 9 +- .../apache/tez/dag/api/ContainerEndReason.java | 27 ---- .../tez/dag/api/TaskAttemptEndReason.java | 27 ---- .../apache/tez/dag/api/TaskCommunicator.java | 9 ++ .../tez/dag/api/TaskCommunicatorContext.java | 2 +- .../tez/dag/api/TaskCommunicatorInterface.java | 18 +++ .../java/org/apache/tez/dag/app/AppContext.java | 5 + .../dag/app/ContainerLauncherContextImpl.java | 101 ++++++++++++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 15 +++ .../apache/tez/dag/app/TaskAttemptListener.java | 6 +- .../dag/app/TaskAttemptListenerImpTezDag.java | 4 +- .../dag/app/TaskCommunicatorContextImpl.java | 6 +- .../tez/dag/app/TezTaskCommunicatorImpl.java | 12 +- .../tez/dag/app/launcher/ContainerLauncher.java | 29 ---- .../dag/app/launcher/ContainerLauncherImpl.java | 128 ++++++++---------- .../app/launcher/ContainerLauncherRouter.java | 52 +++++++- .../tez/dag/app/launcher/ContainerOp.java | 62 +++++++++ .../app/launcher/LocalContainerLauncher.java | 123 ++++++++--------- .../tez/dag/app/rm/AMSchedulerEventTAEnded.java | 2 +- .../dag/app/rm/LocalTaskSchedulerService.java | 2 +- .../tez/dag/app/rm/NMCommunicatorEvent.java | 18 ++- .../rm/NMCommunicatorLaunchRequestEvent.java | 11 +- .../app/rm/NMCommunicatorStopRequestEvent.java | 4 +- .../tez/dag/app/rm/TaskSchedulerService.java | 2 +- .../dag/app/rm/YarnTaskSchedulerService.java | 2 +- .../rm/container/AMContainerEventCompleted.java | 2 +- .../dag/app/rm/container/AMContainerImpl.java | 9 +- .../apache/tez/dag/app/MockDAGAppMaster.java | 63 ++++----- .../app/TestTaskAttemptListenerImplTezDag.java | 4 +- .../app/TestTaskAttemptListenerImplTezDag2.java | 2 +- .../tez/dag/app/rm/TestContainerReuse.java | 4 +- .../app/rm/TestLocalTaskSchedulerService.java | 1 - .../dag/app/rm/container/TestAMContainer.java | 4 +- .../TezTestServiceContainerLauncher.java | 133 +++++++++---------- .../TezTestServiceNoOpContainerLauncher.java | 53 +++----- .../rm/TezTestServiceTaskSchedulerService.java | 2 +- .../TezTestServiceTaskCommunicatorImpl.java | 4 +- 45 files changed, 887 insertions(+), 420 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 604947c..88dd0c7 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -34,5 +34,6 @@ ALL CHANGES: TEZ-2526. Fix version for tez-history-parser. TEZ-2621. rebase 07/14 TEZ-2124. Change Node tracking to work per external container source. + TEZ-2004. Define basic interface for pluggable ContainerLaunchers. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerEndReason.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerEndReason.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerEndReason.java new file mode 100644 index 0000000..ab8619f --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerEndReason.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + [email protected] [email protected] +public enum ContainerEndReason { + NODE_FAILED, // Completed because the node running the container was marked as dead + INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision + EXTERNAL_PREEMPTION, // Preempted due to cluster contention + APPLICATION_ERROR, // An error in the AM caused by user code + FRAMEWORK_ERROR, // An error in the AM - likely a bug. + LAUNCH_FAILED, // Failure to launch the container + COMPLETED, // Completed via normal flow + OTHER +} http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java new file mode 100644 index 0000000..cfd7ca7 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java @@ -0,0 +1,81 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Token; + [email protected] [email protected] +public class ContainerLaunchRequest extends ContainerLauncherOperationBase { + + private final ContainerLaunchContext clc; + private final Container container; + private final String schedulerName; + private final String taskCommName; + + public ContainerLaunchRequest(NodeId nodeId, + ContainerId containerId, + Token containerToken, + ContainerLaunchContext clc, + Container container, String schedulerName, String taskCommName) { + super(nodeId, containerId, containerToken); + this.clc = clc; + this.container = container; + this.schedulerName = schedulerName; + this.taskCommName = taskCommName; + } + + + // TODO Post TEZ-2003. TEZ-2625. ContainerLaunchContext needs to be built here instead of being passed in. + // Basic specifications need to be provided here + public ContainerLaunchContext getContainerLaunchContext() { + return clc; + } + + /** + * Get the name of the task communicator which will be used to communicate + * with the task that will run in this container. + * @return + */ + public String getTaskCommunicatorName() { + return taskCommName; + } + + /** + * Get the name of the scheduler which allocated this container. + * @return + */ + public String getSchedulerName() { + return schedulerName; + } + + @Override + public String toString() { + return "ContainerLaunchRequest{" + + "nodeId=" + getNodeId() + + ", containerId=" + getContainerId() + + ", clc=" + clc + + ", container=" + container + + ", schedulerName='" + schedulerName + '\'' + + ", taskCommName='" + taskCommName + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java new file mode 100644 index 0000000..218edb6 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.service.AbstractService; + +/** + * Plugin to allow custom container launchers to be written to launch containers on different types + * of executors. + */ + [email protected] [email protected] +public abstract class ContainerLauncher extends AbstractService { + + private final ContainerLauncherContext containerLauncherContext; + + // TODO TEZ-2003 Simplify this by moving away from AbstractService. Potentially Guava AbstractService. + // A serviceInit(Configuration) is not likely to be very useful, and will expose unnecessary internal + // configuration to the services if populated with the AM Configuration + public ContainerLauncher(String name, ContainerLauncherContext containerLauncherContext) { + super(name); + this.containerLauncherContext = containerLauncherContext; + } + + public final ContainerLauncherContext getContext() { + return this.containerLauncherContext; + } + + public abstract void launchContainer(ContainerLaunchRequest launchRequest); + public abstract void stopContainer(ContainerStopRequest stopRequest); +} http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java new file mode 100644 index 0000000..836dc4a --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; + [email protected] [email protected] +public interface ContainerLauncherContext { + + // TODO Post TEZ-2003. Tez abstraction for ContainerId, NodeId, other YARN constructs + + // Reporting APIs + void containerLaunched(ContainerId containerId); + + void containerLaunchFailed(ContainerId containerId, String diagnostics); + + void containerStopRequested(ContainerId containerId); + + void containerStopFailed(ContainerId containerId, String diagnostics); + + // TODO Post TEZ-2003. TaskAttemptEndReason does not belong here, and is an unnecessary leak. + // ContainerCompleted is normally generated by the scheduler in case of YARN since the RM informs about completion. + // For other sources, there may not be a central entity making this information available. The ContainerLauncher + // on the stop request will likely be the best place to generate it. + void containerCompleted(ContainerId containerId, int exitStatus, String diagnostics, TaskAttemptEndReason endReason); + + // Lookup APIs + + // TODO TEZ-2003. To be replaced by getInitialPayload once the DAG API is changed. + Configuration getInitialConfiguration(); + + int getNumNodes(String sourceName); + + ApplicationAttemptId getApplicationAttemptId(); + + Object getTaskCommunicatorMetaInfo(String taskCommName); +} http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java new file mode 100644 index 0000000..29e0420 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Token; + [email protected] [email protected] +public class ContainerLauncherOperationBase { + + private final NodeId nodeId; + private final ContainerId containerId; + private final Token containerToken; + + public ContainerLauncherOperationBase(NodeId nodeId, + ContainerId containerId, + Token containerToken) { + this.nodeId = nodeId; + this.containerId = containerId; + this.containerToken = containerToken; + } + + public NodeId getNodeId() { + return nodeId; + } + + public ContainerId getContainerId() { + return containerId; + } + + public Token getContainerToken() { + return containerToken; + } + + @Override + public String toString() { + return "ContainerLauncherOperationBase{" + + "nodeId=" + nodeId + + ", containerId=" + containerId + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java new file mode 100644 index 0000000..cb0af31 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Token; + [email protected] [email protected] +public class ContainerStopRequest extends ContainerLauncherOperationBase { + + private final String schedulerName; + private final String taskCommName; + + public ContainerStopRequest(NodeId nodeId, + ContainerId containerId, + Token containerToken, String schedulerName, String taskCommName) { + super(nodeId, containerId, containerToken); + this.schedulerName = schedulerName; + this.taskCommName = taskCommName; + } + + @Override + public String toString() { + return "ContainerStopRequest{" + + "nodeId=" + getNodeId() + + ", containerId=" + getContainerId() + + ", schedulerName='" + schedulerName + '\'' + + ", taskCommName='" + taskCommName + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java new file mode 100644 index 0000000..4255c28 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java @@ -0,0 +1,32 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + [email protected] [email protected] +public enum TaskAttemptEndReason { + NODE_FAILED, // Completed because the node running the container was marked as dead + COMMUNICATION_ERROR, // Communication error with the task + SERVICE_BUSY, // External service busy + INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision + EXTERNAL_PREEMPTION, // Preempted due to cluster contention + APPLICATION_ERROR, // An error in the AM caused by user code + FRAMEWORK_ERROR, // An error in the AM - likely a bug. + CONTAINER_EXITED, + OTHER // Unknown reason +} http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java index 0bdeb79..4c8c227 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java @@ -35,13 +35,12 @@ import java.util.zip.Inflater; import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors; import com.google.protobuf.TextFormat; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.log4j.Appender; import org.apache.tez.dag.api.DagTypeConverters; -import org.apache.tez.dag.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; @@ -256,6 +255,8 @@ public class TezUtilsInternal { return TaskAttemptTerminationCause.FRAMEWORK_ERROR; case NODE_FAILED: return TaskAttemptTerminationCause.NODE_FAILED; + case CONTAINER_EXITED: + return TaskAttemptTerminationCause.CONTAINER_EXITED; case OTHER: return TaskAttemptTerminationCause.UNKNOWN_ERROR; default: @@ -283,6 +284,8 @@ public class TezUtilsInternal { return TaskAttemptEndReason.FRAMEWORK_ERROR; case NODE_FAILED: return TaskAttemptEndReason.NODE_FAILED; + case CONTAINER_EXITED: + return TaskAttemptEndReason.CONTAINER_EXITED; case INTERRUPTED_BY_SYSTEM: case INTERRUPTED_BY_USER: case UNKNOWN_ERROR: @@ -296,7 +299,7 @@ public class TezUtilsInternal { case OUTPUT_LOST: case TASK_HEARTBEAT_ERROR: case CONTAINER_LAUNCH_FAILED: - case CONTAINER_EXITED: + case CONTAINER_STOPPED: case NODE_DISK_ERROR: default: http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java b/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java deleted file mode 100644 index e13e886..0000000 --- a/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.api; - -// TODO TEZ-2003 Expose as a public API -public enum ContainerEndReason { - NODE_FAILED, // Completed because the node running the container was marked as dead - INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision - EXTERNAL_PREEMPTION, // Preempted due to cluster contention - APPLICATION_ERROR, // An error in the AM caused by user code - FRAMEWORK_ERROR, // An error in the AM - likely a bug. - LAUNCH_FAILED, // Failure to launch the container - COMPLETED, // Completed via normal flow - OTHER -} http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java b/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java deleted file mode 100644 index de78d21..0000000 --- a/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.api; - -// TODO TEZ-2003 Expose as a public API -public enum TaskAttemptEndReason { - NODE_FAILED, // Completed because the node running the container was marked as dead - COMMUNICATION_ERROR, // Communication error with the task - SERVICE_BUSY, // External service busy - INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision - EXTERNAL_PREEMPTION, // Preempted due to cluster contention - APPLICATION_ERROR, // An error in the AM caused by user code - FRAMEWORK_ERROR, // An error in the AM - likely a bug. - OTHER // Unknown reason -} http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java index d0a006b..05e437c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java @@ -22,6 +22,8 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.TaskSpec; @@ -87,4 +89,11 @@ public abstract class TaskCommunicator extends AbstractService { // TODO TEZ-2003 This is extremely difficult to use. Add the dagStarted notification, and potentially // throw exceptions between a dagComplete and dagStart invocation. public abstract void dagComplete(String dagName); + + /** + * Share meta-information such as host:port information where the Task Communicator may be listening. + * Primarily for use by compatible launchers to learn this information. + * @return + */ + public abstract Object getMetaInfo(); } http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java index 56345ab..b6e63f7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java @@ -16,13 +16,13 @@ package org.apache.tez.dag.api; import javax.annotation.Nullable; import java.io.IOException; -import java.util.Collection; import java.util.Set; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.records.TezTaskAttemptID; http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java new file mode 100644 index 0000000..022cd7b --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java @@ -0,0 +1,18 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +public interface TaskCommunicatorInterface { +} http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java index 1ccb10b..516fcef 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java @@ -118,4 +118,9 @@ public interface AppContext { public Integer getTaskCommunicatorIdentifier(String name); public Integer getTaskScheduerIdentifier(String name); public Integer getContainerLauncherIdentifier(String name); + + public String getTaskCommunicatorName(int taskCommId); + public String getTaskSchedulerName(int schedulerId); + public String getContainerLauncherName(int launcherId); + } http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java new file mode 100644 index 0000000..997775a --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java @@ -0,0 +1,101 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.serviceplugins.api.ContainerLauncherContext; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; +import org.apache.tez.dag.app.rm.container.AMContainerEvent; +import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted; +import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed; +import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched; +import org.apache.tez.dag.app.rm.container.AMContainerEventStopFailed; +import org.apache.tez.dag.app.rm.container.AMContainerEventType; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.events.ContainerLaunchedEvent; + +public class ContainerLauncherContextImpl implements ContainerLauncherContext { + + private final AppContext context; + private final TaskAttemptListener tal; + + public ContainerLauncherContextImpl(AppContext appContext, TaskAttemptListener tal) { + this.context = appContext; + this.tal = tal; + } + + @Override + public void containerLaunched(ContainerId containerId) { + context.getEventHandler().handle( + new AMContainerEventLaunched(containerId)); + ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent( + containerId, context.getClock().getTime(), context.getApplicationAttemptId()); + context.getHistoryHandler().handle(new DAGHistoryEvent( + null, lEvt)); + + } + + @Override + public void containerLaunchFailed(ContainerId containerId, String diagnostics) { + context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, diagnostics)); + } + + @Override + public void containerStopRequested(ContainerId containerId) { + context.getEventHandler().handle( + new AMContainerEvent(containerId, AMContainerEventType.C_NM_STOP_SENT)); + } + + @Override + public void containerStopFailed(ContainerId containerId, String diagnostics) { + context.getEventHandler().handle( + new AMContainerEventStopFailed(containerId, diagnostics)); + } + + @Override + public void containerCompleted(ContainerId containerId, int exitStatus, String diagnostics, + TaskAttemptEndReason endReason) { + context.getEventHandler().handle(new AMContainerEventCompleted(containerId, exitStatus, diagnostics, TezUtilsInternal + .fromTaskAttemptEndReason( + endReason))); + } + + @Override + public Configuration getInitialConfiguration() { + return context.getAMConf(); + } + + @Override + public int getNumNodes(String sourceName) { + int sourceIndex = context.getTaskScheduerIdentifier(sourceName); + int numNodes = context.getNodeTracker().getNumNodes(sourceIndex); + return numNodes; + } + + @Override + public ApplicationAttemptId getApplicationAttemptId() { + return context.getApplicationAttemptId(); + } + + @Override + public Object getTaskCommunicatorMetaInfo(String taskCommName) { + int taskCommId = context.getTaskCommunicatorIdentifier(taskCommName); + return tal.getTaskCommunicator(taskCommId).getMetaInfo(); + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 04e72db..d56fb95 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -1551,6 +1551,21 @@ public class DAGAppMaster extends AbstractService { } @Override + public String getTaskCommunicatorName(int taskCommId) { + return taskCommunicators.inverse().get(taskCommId); + } + + @Override + public String getTaskSchedulerName(int schedulerId) { + return taskSchedulers.inverse().get(schedulerId); + } + + @Override + public String getContainerLauncherName(int launcherId) { + return containerLaunchers.inverse().get(launcherId); + } + + @Override public Map<ApplicationAccessType, String> getApplicationACLs() { if (getServiceState() != STATE.STARTED) { throw new TezUncheckedException( http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java index 92e38ae..2eec2fb 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java @@ -18,11 +18,9 @@ package org.apache.tez.dag.app; -import java.net.InetSocketAddress; - import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.tez.dag.api.ContainerEndReason; -import org.apache.tez.dag.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.app.rm.container.AMContainerTask; http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index e2d44e2..47b63dd 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentMap; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.collections4.ListUtils; -import org.apache.tez.dag.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; @@ -43,7 +43,7 @@ import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.api.TaskCommunicatorContext; -import org.apache.tez.dag.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskHeartbeatResponse; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java index 790066f..50e006d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java @@ -17,10 +17,6 @@ package org.apache.tez.dag.app; import javax.annotation.Nullable; import java.io.IOException; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.base.Function; @@ -30,7 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.tez.dag.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.api.TaskHeartbeatRequest; import org.apache.tez.dag.api.TaskHeartbeatResponse; http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index 83322f2..0374022 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -41,8 +41,8 @@ import org.apache.tez.common.ContainerContext; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.common.security.TokenCache; -import org.apache.tez.dag.api.ContainerEndReason; -import org.apache.tez.dag.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.api.TaskHeartbeatRequest; @@ -180,7 +180,8 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { @Override public void registerRunningContainer(ContainerId containerId, String host, int port) { - ContainerInfo oldInfo = registeredContainers.putIfAbsent(containerId, new ContainerInfo(containerId, host, port)); + ContainerInfo oldInfo = registeredContainers.putIfAbsent(containerId, + new ContainerInfo(containerId, host, port)); if (oldInfo != null) { throw new TezUncheckedException("Multiple registrations for containerId: " + containerId); } @@ -267,6 +268,11 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { // Nothing to do at the moment. Some of the TODOs from TaskAttemptListener apply here. } + @Override + public Object getMetaInfo() { + return address; + } + protected String getTokenIdentifier() { return tokenIdentifier; } http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java deleted file mode 100644 index ea07a1d..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java +++ /dev/null @@ -1,29 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.dag.app.launcher; - - -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.tez.dag.app.dag.DAG; -import org.apache.tez.dag.app.rm.NMCommunicatorEvent; - -public interface ContainerLauncher - extends EventHandler<NMCommunicatorEvent> { - -} http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java index a12fb04..fe0178c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java @@ -30,12 +30,15 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; +import org.apache.tez.serviceplugins.api.ContainerLauncher; +import org.apache.tez.serviceplugins.api.ContainerLauncherContext; +import org.apache.tez.serviceplugins.api.ContainerStopRequest; import org.apache.tez.dag.api.TezConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -45,57 +48,43 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy; import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData; -import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Records; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.app.AppContext; -import org.apache.tez.dag.app.rm.NMCommunicatorEvent; -import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; -import org.apache.tez.dag.app.rm.container.AMContainerEvent; -import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed; -import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched; -import org.apache.tez.dag.app.rm.container.AMContainerEventStopFailed; -import org.apache.tez.dag.app.rm.container.AMContainerEventType; -import org.apache.tez.dag.history.DAGHistoryEvent; -import org.apache.tez.dag.history.events.ContainerLaunchedEvent; import com.google.common.util.concurrent.ThreadFactoryBuilder; -// TODO XXX: See what part of this lifecycle and state management can be simplified. +// TODO See what part of this lifecycle and state management can be simplified. // Ideally, no state - only sendStart / sendStop. -// TODO XXX: Review this entire code and clean it up. +// TODO Review this entire code and clean it up. /** * This class is responsible for launching of containers. */ -public class ContainerLauncherImpl extends AbstractService implements - ContainerLauncher { +public class ContainerLauncherImpl extends ContainerLauncher { - // TODO XXX Ensure the same thread is used to launch / stop the same container. Or - ensure event ordering. + // TODO Ensure the same thread is used to launch / stop the same container. Or - ensure event ordering. static final Logger LOG = LoggerFactory.getLogger(ContainerLauncherImpl.class); - private ConcurrentHashMap<ContainerId, Container> containers = - new ConcurrentHashMap<ContainerId, Container>(); - private AppContext context; + private final ConcurrentHashMap<ContainerId, Container> containers = + new ConcurrentHashMap<>(); protected ThreadPoolExecutor launcherPool; protected static final int INITIAL_POOL_SIZE = 10; - private int limitOnPoolSize; + private final int limitOnPoolSize; + private final Configuration conf; private Thread eventHandlingThread; - protected BlockingQueue<NMCommunicatorEvent> eventQueue = - new LinkedBlockingQueue<NMCommunicatorEvent>(); - private Clock clock; + protected BlockingQueue<ContainerOp> eventQueue = new LinkedBlockingQueue<>(); private ContainerManagementProtocolProxy cmProxy; private AtomicBoolean serviceStopped = new AtomicBoolean(false); - private Container getContainer(NMCommunicatorEvent event) { - ContainerId id = event.getContainerId(); + private Container getContainer(ContainerOp event) { + ContainerId id = event.getBaseOperation().getContainerId(); Container c = containers.get(id); if(c == null) { - c = new Container(event.getContainerId(), - event.getNodeId().toString(), event.getContainerToken()); + c = new Container(id, + event.getBaseOperation().getNodeId().toString(), event.getBaseOperation().getContainerToken()); Container old = containers.putIfAbsent(id, c); if(old != null) { c = old; @@ -111,6 +100,7 @@ public class ContainerLauncherImpl extends AbstractService implements } } + private static enum ContainerState { PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH } @@ -135,7 +125,7 @@ public class ContainerLauncherImpl extends AbstractService implements } @SuppressWarnings("unchecked") - public synchronized void launch(NMCommunicatorLaunchRequestEvent event) { + public synchronized void launch(ContainerLaunchRequest event) { LOG.info("Launching Container with Id: " + event.getContainerId()); if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) { state = ContainerState.DONE; @@ -171,13 +161,7 @@ public class ContainerLauncherImpl extends AbstractService implements // after launching, send launched event to task attempt to move // it from ASSIGNED to RUNNING state - context.getEventHandler().handle( - new AMContainerEventLaunched(containerID)); - ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent( - containerID, clock.getTime(), context.getApplicationAttemptId()); - context.getHistoryHandler().handle(new DAGHistoryEvent( - null, lEvt)); - + getContext().containerLaunched(containerID); this.state = ContainerState.RUNNING; } catch (Throwable t) { String message = "Container launch failed for " + containerID + " : " @@ -217,16 +201,14 @@ public class ContainerLauncherImpl extends AbstractService implements // If stopContainer returns without an error, assuming the stop made // it over to the NodeManager. - context.getEventHandler().handle( - new AMContainerEvent(containerID, AMContainerEventType.C_NM_STOP_SENT)); + getContext().containerStopRequested(containerID); } catch (Throwable t) { // ignore the cleanup failure String message = "cleanup failed for container " + this.containerID + " : " + ExceptionUtils.getStackTrace(t); - context.getEventHandler().handle( - new AMContainerEventStopFailed(containerID, message)); + getContext().containerStopFailed(containerID, message); LOG.warn(message); this.state = ContainerState.DONE; return; @@ -240,15 +222,9 @@ public class ContainerLauncherImpl extends AbstractService implements } } - public ContainerLauncherImpl(AppContext context) { - super(ContainerLauncherImpl.class.getName()); - this.context = context; - this.clock = context.getClock(); - } - - @Override - public synchronized void serviceInit(Configuration config) { - Configuration conf = new Configuration(config); + public ContainerLauncherImpl(ContainerLauncherContext containerLauncherContext) { + super(ContainerLauncherImpl.class.getName(), containerLauncherContext); + this.conf = new Configuration(containerLauncherContext.getInitialConfiguration()); conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0); @@ -262,7 +238,7 @@ public class ContainerLauncherImpl extends AbstractService implements public void serviceStart() { // pass a copy of config to ContainerManagementProtocolProxy until YARN-3497 is fixed cmProxy = - new ContainerManagementProtocolProxy(new Configuration(getConfig())); + new ContainerManagementProtocolProxy(conf); ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( "ContainerLauncher #%d").setDaemon(true).build(); @@ -275,7 +251,7 @@ public class ContainerLauncherImpl extends AbstractService implements eventHandlingThread = new Thread() { @Override public void run() { - NMCommunicatorEvent event = null; + ContainerOp event = null; while (!Thread.currentThread().isInterrupted()) { try { event = eventQueue.take(); @@ -293,9 +269,8 @@ public class ContainerLauncherImpl extends AbstractService implements // nodes where containers will run at *this* point of time. This is // *not* the cluster size and doesn't need to be. - int yarnSourceIndex = - context.getTaskScheduerIdentifier(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); - int numNodes = context.getNodeTracker().getNumNodes(yarnSourceIndex); + int numNodes = getContext().getNumNodes( + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); int idealPoolSize = Math.min(limitOnPoolSize, numNodes); if (poolSize < idealPoolSize) { @@ -347,7 +322,7 @@ public class ContainerLauncherImpl extends AbstractService implements } } - protected EventProcessor createEventProcessor(NMCommunicatorEvent event) { + protected EventProcessor createEventProcessor(ContainerOp event) { return new EventProcessor(event); } @@ -361,32 +336,29 @@ public class ContainerLauncherImpl extends AbstractService implements * Setup and start the container on remote nodemanager. */ class EventProcessor implements Runnable { - private NMCommunicatorEvent event; + private ContainerOp event; - EventProcessor(NMCommunicatorEvent event) { + EventProcessor(ContainerOp event) { this.event = event; } @Override public void run() { - LOG.info("Processing the event " + event.toString()); + LOG.info("Processing operation {}", event.toString()); // Load ContainerManager tokens before creating a connection. // TODO: Do it only once per NodeManager. - ContainerId containerID = event.getContainerId(); + ContainerId containerID = event.getBaseOperation().getContainerId(); Container c = getContainer(event); - switch(event.getType()) { - - case CONTAINER_LAUNCH_REQUEST: - NMCommunicatorLaunchRequestEvent launchEvent - = (NMCommunicatorLaunchRequestEvent) event; - c.launch(launchEvent); - break; - - case CONTAINER_STOP_REQUEST: - c.kill(); - break; + switch(event.getOpType()) { + case LAUNCH_REQUEST: + ContainerLaunchRequest launchRequest = event.getLaunchRequest(); + c.launch(launchRequest); + break; + case STOP_REQUEST: + c.kill(); + break; } removeContainerIfDone(containerID); } @@ -408,13 +380,23 @@ public class ContainerLauncherImpl extends AbstractService implements void sendContainerLaunchFailedMsg(ContainerId containerId, String message) { LOG.error(message); - context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, message)); + getContext().containerLaunchFailed(containerId, message); + } + + + @Override + public void launchContainer(ContainerLaunchRequest launchRequest) { + try { + eventQueue.put(new ContainerOp(ContainerOp.OPType.LAUNCH_REQUEST, launchRequest)); + } catch (InterruptedException e) { + throw new TezUncheckedException(e); + } } @Override - public void handle(NMCommunicatorEvent event) { + public void stopContainer(ContainerStopRequest stopRequest) { try { - eventQueue.put(event); + eventQueue.put(new ContainerOp(ContainerOp.OPType.STOP_REQUEST, stopRequest)); } catch (InterruptedException e) { throw new TezUncheckedException(e); } http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java index db145f4..9f741cf 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java @@ -23,12 +23,18 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; +import org.apache.tez.serviceplugins.api.ContainerLauncher; +import org.apache.tez.serviceplugins.api.ContainerLauncherContext; +import org.apache.tez.serviceplugins.api.ContainerStopRequest; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.ContainerLauncherContextImpl; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.rm.NMCommunicatorEvent; +import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,11 +44,15 @@ public class ContainerLauncherRouter extends AbstractService static final Logger LOG = LoggerFactory.getLogger(ContainerLauncherImpl.class); private final ContainerLauncher containerLaunchers[]; + private final ContainerLauncherContext containerLauncherContexts[]; + private final AppContext appContext; @VisibleForTesting - public ContainerLauncherRouter(ContainerLauncher containerLauncher) { + public ContainerLauncherRouter(ContainerLauncher containerLauncher, AppContext context) { super(ContainerLauncherRouter.class.getName()); + this.appContext = context; containerLaunchers = new ContainerLauncher[] {containerLauncher}; + containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()}; } // Accepting conf to setup final parameters, if required. @@ -53,6 +63,7 @@ public class ContainerLauncherRouter extends AbstractService boolean isPureLocalMode) throws UnknownHostException { super(ContainerLauncherRouter.class.getName()); + this.appContext = context; if (containerLauncherClassIdentifiers == null || containerLauncherClassIdentifiers.length == 0) { if (isPureLocalMode) { containerLauncherClassIdentifiers = @@ -62,16 +73,21 @@ public class ContainerLauncherRouter extends AbstractService new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT}; } } + containerLauncherContexts = new ContainerLauncherContext[containerLauncherClassIdentifiers.length]; containerLaunchers = new ContainerLauncher[containerLauncherClassIdentifiers.length]; + for (int i = 0; i < containerLauncherClassIdentifiers.length; i++) { + ContainerLauncherContext containerLauncherContext = new ContainerLauncherContextImpl(context, taskAttemptListener); + containerLauncherContexts[i] = containerLauncherContext; containerLaunchers[i] = createContainerLauncher(containerLauncherClassIdentifiers[i], context, - taskAttemptListener, workingDirectory, isPureLocalMode, conf); + containerLauncherContext, taskAttemptListener, workingDirectory, isPureLocalMode, conf); } } private ContainerLauncher createContainerLauncher(String containerLauncherClassIdentifier, AppContext context, + ContainerLauncherContext containerLauncherContext, TaskAttemptListener taskAttemptListener, String workingDirectory, boolean isPureLocalMode, @@ -79,12 +95,15 @@ public class ContainerLauncherRouter extends AbstractService UnknownHostException { if (containerLauncherClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { LOG.info("Creating DefaultContainerLauncher"); - return new ContainerLauncherImpl(context); + return new ContainerLauncherImpl(containerLauncherContext); } else if (containerLauncherClassIdentifier .equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) { LOG.info("Creating LocalContainerLauncher"); + // TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of + // extensive internals which are only available at runtime. Will likely require + // some kind of runtime binding of parameters in the payload to work correctly. return - new LocalContainerLauncher(context, taskAttemptListener, workingDirectory, isPureLocalMode); + new LocalContainerLauncher(containerLauncherContext, context, taskAttemptListener, workingDirectory, isPureLocalMode); } else { LOG.info("Creating container launcher : " + containerLauncherClassIdentifier); Class<? extends ContainerLauncher> containerLauncherClazz = @@ -92,9 +111,9 @@ public class ContainerLauncherRouter extends AbstractService containerLauncherClassIdentifier); try { Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz - .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class); + .getConstructor(ContainerLauncherContext.class); ctor.setAccessible(true); - return ctor.newInstance(context, conf, taskAttemptListener); + return ctor.newInstance(containerLauncherContext); } catch (NoSuchMethodException e) { throw new TezUncheckedException(e); } catch (InvocationTargetException e) { @@ -141,6 +160,25 @@ public class ContainerLauncherRouter extends AbstractService @Override public void handle(NMCommunicatorEvent event) { - containerLaunchers[event.getLauncherId()].handle(event); + int launcherId = event.getLauncherId(); + String schedulerName = appContext.getTaskSchedulerName(event.getSchedulerId()); + String taskCommName = appContext.getTaskCommunicatorName(event.getTaskCommId()); + switch (event.getType()) { + case CONTAINER_LAUNCH_REQUEST: + NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event; + ContainerLaunchRequest launchRequest = + new ContainerLaunchRequest(launchEvent.getNodeId(), launchEvent.getContainerId(), + launchEvent.getContainerToken(), launchEvent.getContainerLaunchContext(), + launchEvent.getContainer(), schedulerName, + taskCommName); + containerLaunchers[launcherId].launchContainer(launchRequest); + break; + case CONTAINER_STOP_REQUEST: + ContainerStopRequest stopRequest = + new ContainerStopRequest(event.getNodeId(), event.getContainerId(), + event.getContainerToken(), schedulerName, taskCommName); + containerLaunchers[launcherId].stopContainer(stopRequest); + break; + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerOp.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerOp.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerOp.java new file mode 100644 index 0000000..c62de66 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerOp.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.launcher; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; +import org.apache.tez.serviceplugins.api.ContainerLauncherOperationBase; +import org.apache.tez.serviceplugins.api.ContainerStopRequest; + [email protected] +public class ContainerOp { + enum OPType { + LAUNCH_REQUEST, STOP_REQUEST + } + + final ContainerLauncherOperationBase command; + final OPType opType; + + public ContainerOp(OPType opType, ContainerLauncherOperationBase command) { + this.opType = opType; + this.command = command; + } + + public OPType getOpType() { + return opType; + } + + public ContainerLauncherOperationBase getBaseOperation() { + return command; + } + + public ContainerLaunchRequest getLaunchRequest() { + Preconditions.checkState(opType == OPType.LAUNCH_REQUEST); + return (ContainerLaunchRequest) command; + } + + public ContainerStopRequest getStopRequest() { + Preconditions.checkState(opType == OPType.STOP_REQUEST); + return (ContainerStopRequest) command; + } + + @Override + public String toString() { + return "ContainerOp{" + + "opType=" + opType + + ", command=" + command + + '}'; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index fe23409..a1b8e29 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -44,11 +44,15 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; +import org.apache.tez.serviceplugins.api.ContainerLauncher; +import org.apache.tez.serviceplugins.api.ContainerLauncherContext; +import org.apache.tez.serviceplugins.api.ContainerStopRequest; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; @@ -60,17 +64,6 @@ import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TezTaskCommunicatorImpl; -import org.apache.tez.dag.app.rm.NMCommunicatorEvent; -import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; -import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent; -import org.apache.tez.dag.app.rm.container.AMContainerEvent; -import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted; -import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed; -import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched; -import org.apache.tez.dag.app.rm.container.AMContainerEventType; -import org.apache.tez.dag.history.DAGHistoryEvent; -import org.apache.tez.dag.history.events.ContainerLaunchedEvent; -import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; @@ -82,17 +75,17 @@ import org.apache.tez.runtime.task.TezChild; * Since all (sub)tasks share the same local directory, they must be executed * sequentially in order to avoid creating/deleting the same files/dirs. */ -public class LocalContainerLauncher extends AbstractService implements - ContainerLauncher { +public class LocalContainerLauncher extends ContainerLauncher { private static final Logger LOG = LoggerFactory.getLogger(LocalContainerLauncher.class); + private final AppContext context; private final AtomicBoolean serviceStopped = new AtomicBoolean(false); private final String workingDirectory; private final TaskAttemptListener tal; private final Map<String, String> localEnv; private final ExecutionContext executionContext; - private int numExecutors; + private final int numExecutors; private final boolean isPureLocalMode; private final ConcurrentHashMap<ContainerId, RunningTaskCallback> @@ -102,23 +95,25 @@ public class LocalContainerLauncher extends AbstractService implements private final ExecutorService callbackExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CallbackExecutor").build()); - private BlockingQueue<NMCommunicatorEvent> eventQueue = - new LinkedBlockingQueue<NMCommunicatorEvent>(); + private BlockingQueue<ContainerOp> eventQueue = new LinkedBlockingQueue<>(); private Thread eventHandlingThread; private ListeningExecutorService taskExecutorService; - - public LocalContainerLauncher(AppContext context, + public LocalContainerLauncher(ContainerLauncherContext containerLauncherContext, + AppContext context, TaskAttemptListener taskAttemptListener, String workingDirectory, boolean isPureLocalMode) throws UnknownHostException { - super(LocalContainerLauncher.class.getName()); + // TODO Post TEZ-2003. Most of this information is dynamic and only available after the AM + // starts up. It's not possible to set these up via a static payload. + // Will need some kind of mechanism to dynamically crate payloads / bind to parameters + // after the AM starts up. + super(LocalContainerLauncher.class.getName(), containerLauncherContext); this.context = context; this.tal = taskAttemptListener; - this.workingDirectory = workingDirectory; this.isPureLocalMode = isPureLocalMode; if (isPureLocalMode) { @@ -133,11 +128,8 @@ public class LocalContainerLauncher extends AbstractService implements String host = isPureLocalMode ? InetAddress.getLocalHost().getHostName() : System.getenv(Environment.NM_HOST.name()); executionContext = new ExecutionContextImpl(host); - } - @Override - public synchronized void serviceInit(Configuration conf) { - numExecutors = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, + numExecutors = getContext().getInitialConfiguration().getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT); Preconditions.checkState(numExecutors >=1, "Must have at least 1 executor"); ExecutorService rawExecutor = Executors.newFixedThreadPool(numExecutors, @@ -169,20 +161,22 @@ public class LocalContainerLauncher extends AbstractService implements callbackExecutor.shutdownNow(); } + + // Thread to monitor the queue of incoming NMCommunicator events private class TezSubTaskRunner implements Runnable { @Override public void run() { while (!Thread.currentThread().isInterrupted() && !serviceStopped.get()) { - NMCommunicatorEvent event; + ContainerOp event; try { event = eventQueue.take(); - switch (event.getType()) { - case CONTAINER_LAUNCH_REQUEST: - launch((NMCommunicatorLaunchRequestEvent) event); + switch (event.getOpType()) { + case LAUNCH_REQUEST: + launch(event.getLaunchRequest()); break; - case CONTAINER_STOP_REQUEST: - stop((NMCommunicatorStopRequestEvent)event); + case STOP_REQUEST: + stop(event.getStopRequest()); break; } } catch (InterruptedException e) { @@ -200,7 +194,7 @@ public class LocalContainerLauncher extends AbstractService implements @SuppressWarnings("unchecked") void sendContainerLaunchFailedMsg(ContainerId containerId, String message) { - context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, message)); + getContext().containerLaunchFailed(containerId, message); } private void handleLaunchFailed(Throwable t, ContainerId containerId) { @@ -215,16 +209,17 @@ public class LocalContainerLauncher extends AbstractService implements } //launch tasks - private void launch(NMCommunicatorLaunchRequestEvent event) { + private void launch(ContainerLaunchRequest event) { String tokenIdentifier = context.getApplicationID().toString(); try { TezChild tezChild; try { + int taskCommId = context.getTaskCommunicatorIdentifier(event.getTaskCommunicatorName()); tezChild = createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier, context.getApplicationAttemptId().getAttemptId(), context.getLocalDirs(), - ((TezTaskCommunicatorImpl)tal.getTaskCommunicator(event.getTaskCommId())).getUmbilical(), + ((TezTaskCommunicatorImpl)tal.getTaskCommunicator(taskCommId)).getUmbilical(), TezCommonUtils.parseCredentialsBytes(event.getContainerLaunchContext().getTokens().array())); } catch (InterruptedException e) { handleLaunchFailed(e, event.getContainerId()); @@ -238,7 +233,7 @@ public class LocalContainerLauncher extends AbstractService implements } ListenableFuture<TezChild.ContainerExecutionResult> runningTaskFuture = taskExecutorService.submit(createSubTask(tezChild, event.getContainerId())); - RunningTaskCallback callback = new RunningTaskCallback(context, event.getContainerId()); + RunningTaskCallback callback = new RunningTaskCallback(event.getContainerId()); runningContainers.put(event.getContainerId(), callback); Futures.addCallback(runningTaskFuture, callback, callbackExecutor); } catch (RejectedExecutionException e) { @@ -246,7 +241,7 @@ public class LocalContainerLauncher extends AbstractService implements } } - private void stop(NMCommunicatorStopRequestEvent event) { + private void stop(ContainerStopRequest event) { // A stop_request will come in when a task completes and reports back or a preemption decision // is made. Currently the LocalTaskScheduler does not support preemption. Also preemption // will not work in local mode till Tez supports task preemption instead of container preemption. @@ -263,18 +258,15 @@ public class LocalContainerLauncher extends AbstractService implements // This will need to be fixed once interrupting tasks is supported. } // Send this event to maintain regular control flow. This isn't of much use though. - context.getEventHandler().handle( - new AMContainerEvent(event.getContainerId(), AMContainerEventType.C_NM_STOP_SENT)); + getContext().containerStopRequested(event.getContainerId()); } private class RunningTaskCallback implements FutureCallback<TezChild.ContainerExecutionResult> { - private final AppContext appContext; private final ContainerId containerId; - RunningTaskCallback(AppContext appContext, ContainerId containerId) { - this.appContext = appContext; + RunningTaskCallback(ContainerId containerId) { this.containerId = containerId; } @@ -286,16 +278,16 @@ public class LocalContainerLauncher extends AbstractService implements result.getExitStatus() == TezChild.ContainerExecutionResult.ExitStatus.ASKED_TO_DIE) { LOG.info("Container: " + containerId + " completed successfully"); - appContext.getEventHandler().handle( - new AMContainerEventCompleted(containerId, result.getExitStatus().getExitCode(), - null, TaskAttemptTerminationCause.CONTAINER_EXITED)); + getContext() + .containerCompleted(containerId, result.getExitStatus().getExitCode(), null, + TaskAttemptEndReason.CONTAINER_EXITED); } else { LOG.info("Container: " + containerId + " completed but with errors"); - appContext.getEventHandler().handle( - new AMContainerEventCompleted(containerId, result.getExitStatus().getExitCode(), - result.getErrorMessage() == null ? - (result.getThrowable() == null ? null : result.getThrowable().getMessage()) : - result.getErrorMessage(), TaskAttemptTerminationCause.APPLICATION_ERROR)); + getContext().containerCompleted( + containerId, result.getExitStatus().getExitCode(), + result.getErrorMessage() == null ? + (result.getThrowable() == null ? null : result.getThrowable().getMessage()) : + result.getErrorMessage(), TaskAttemptEndReason.APPLICATION_ERROR); } } @@ -307,16 +299,14 @@ public class LocalContainerLauncher extends AbstractService implements if (!(t instanceof CancellationException)) { LOG.info("Container: " + containerId + ": Execution Failed: ", t); // Inform of failure with exit code 1. - appContext.getEventHandler() - .handle(new AMContainerEventCompleted(containerId, - TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE.getExitCode(), - t.getMessage(), TaskAttemptTerminationCause.APPLICATION_ERROR)); + getContext().containerCompleted(containerId, + TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE.getExitCode(), + t.getMessage(), TaskAttemptEndReason.APPLICATION_ERROR); } else { LOG.info("Ignoring CancellationException - triggered by LocalContainerLauncher"); - appContext.getEventHandler() - .handle(new AMContainerEventCompleted(containerId, - TezChild.ContainerExecutionResult.ExitStatus.SUCCESS.getExitCode(), - "CancellationException", TaskAttemptTerminationCause.CONTAINER_EXITED)); + getContext().containerCompleted(containerId, + TezChild.ContainerExecutionResult.ExitStatus.SUCCESS.getExitCode(), + "CancellationException", TaskAttemptEndReason.COMMUNICATION_ERROR.CONTAINER_EXITED); } } } @@ -334,12 +324,7 @@ public class LocalContainerLauncher extends AbstractService implements // TezTaskRunner needs to be fixed to ensure this. Thread.interrupted(); // Inform about the launch request now that the container has been allocated a thread to execute in. - context.getEventHandler().handle(new AMContainerEventLaunched(containerId)); - ContainerLaunchedEvent lEvt = - new ContainerLaunchedEvent(containerId, context.getClock().getTime(), - context.getApplicationAttemptId()); - - context.getHistoryHandler().handle(new DAGHistoryEvent(context.getCurrentDAGID(), lEvt)); + getContext().containerLaunched(containerId); return tezChild.run(); } }; @@ -368,11 +353,19 @@ public class LocalContainerLauncher extends AbstractService implements } + @Override + public void launchContainer(ContainerLaunchRequest launchRequest) { + try { + eventQueue.put(new ContainerOp(ContainerOp.OPType.LAUNCH_REQUEST, launchRequest)); + } catch (InterruptedException e) { + throw new TezUncheckedException(e); + } + } @Override - public void handle(NMCommunicatorEvent event) { + public void stopContainer(ContainerStopRequest stopRequest) { try { - eventQueue.put(event); + eventQueue.put(new ContainerOp(ContainerOp.OPType.STOP_REQUEST, stopRequest)); } catch (InterruptedException e) { throw new TezUncheckedException(e); } http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java index a775948..33763e7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java @@ -18,7 +18,7 @@ package org.apache.tez.dag.app.rm; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.tez.dag.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.records.TezTaskAttemptID; http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java index a234e07..ef789c5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java @@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.tez.dag.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java index f86894f..dc50c37 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java @@ -29,14 +29,19 @@ public class NMCommunicatorEvent extends AbstractEvent<NMCommunicatorEventType> private final NodeId nodeId; private final Token containerToken; private final int launcherId; + private final int schedulerId; + private final int taskCommId; public NMCommunicatorEvent(ContainerId containerId, NodeId nodeId, - Token containerToken, NMCommunicatorEventType type, int launcherId) { + Token containerToken, NMCommunicatorEventType type, int launcherId, + int schedulerId, int taskCommId) { super(type); this.containerId = containerId; this.nodeId = nodeId; this.containerToken = containerToken; this.launcherId = launcherId; + this.schedulerId = schedulerId; + this.taskCommId = taskCommId; } public ContainerId getContainerId() { @@ -55,9 +60,18 @@ public class NMCommunicatorEvent extends AbstractEvent<NMCommunicatorEventType> return launcherId; } + public int getSchedulerId() { + return schedulerId; + } + + public int getTaskCommId() { + return taskCommId; + } + public String toSrting() { return super.toString() + " for container " + containerId + ", nodeId: " - + nodeId + ", launcherId: " + launcherId; + + nodeId + ", launcherId: " + launcherId + ", schedulerId=" + schedulerId + + ", taskCommId=" + taskCommId; } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java index a38345c..c57b6be 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java @@ -26,15 +26,14 @@ public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent { private final ContainerLaunchContext clc; private final Container container; // The task communicator index for the specific container being launched. - private final int taskCommId; public NMCommunicatorLaunchRequestEvent(ContainerLaunchContext clc, - Container container, int launcherId, int taskCommId) { + Container container, int launcherId, int schedulerId, int taskCommId) { super(container.getId(), container.getNodeId(), container - .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST, launcherId); + .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST, + launcherId, schedulerId, taskCommId); this.clc = clc; this.container = container; - this.taskCommId = taskCommId; } public ContainerLaunchContext getContainerLaunchContext() { @@ -45,10 +44,6 @@ public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent { return container; } - public int getTaskCommId() { - return taskCommId; - } - @Override public boolean equals(Object o) { if (this == o) { http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java index c9b5c44..352f450 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java @@ -25,9 +25,9 @@ import org.apache.hadoop.yarn.api.records.Token; public class NMCommunicatorStopRequestEvent extends NMCommunicatorEvent { public NMCommunicatorStopRequestEvent(ContainerId containerId, NodeId nodeId, - Token containerToken, int launcherId) { + Token containerToken, int launcherId, int schedulerId, int taskCommId) { super(containerId, nodeId, containerToken, - NMCommunicatorEventType.CONTAINER_STOP_REQUEST, launcherId); + NMCommunicatorEventType.CONTAINER_STOP_REQUEST, launcherId, schedulerId, taskCommId); } }
