TEZ-2675. Add javadocs for new pluggable components, fix problems reported by jenkins. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fda06553 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fda06553 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fda06553 Branch: refs/heads/master Commit: fda065536bfb9c7435fe88300ca617ed642a55cf Parents: 5ce54b8 Author: Siddharth Seth <[email protected]> Authored: Fri Aug 7 14:49:58 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri Aug 21 18:15:23 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + pom.xml | 1 + .../java/org/apache/tez/client/TezClient.java | 52 ++++- .../tez/common/ServicePluginLifecycle.java | 9 + .../main/java/org/apache/tez/dag/api/DAG.java | 2 +- .../tez/dag/api/NamedEntityDescriptor.java | 17 ++ .../java/org/apache/tez/dag/api/Vertex.java | 38 +++- .../api/ContainerLaunchRequest.java | 11 +- .../serviceplugins/api/ContainerLauncher.java | 35 ++++ .../api/ContainerLauncherContext.java | 63 +++++- .../api/ContainerLauncherOperationBase.java | 17 ++ .../api/ContainerStopRequest.java | 3 + .../api/ServicePluginsDescriptor.java | 19 +- .../tez/serviceplugins/api/TaskScheduler.java | 161 ++++++++++++++-- .../api/TaskSchedulerContext.java | 190 +++++++++++++++---- .../apache/tez/dag/api/TaskCommunicator.java | 95 +++++++++- .../tez/dag/api/TaskCommunicatorContext.java | 100 +++++++++- .../dag/app/ContainerLauncherContextImpl.java | 1 + .../dag/app/TaskAttemptListenerImpTezDag.java | 15 +- .../app/launcher/ContainerLauncherRouter.java | 14 +- .../dag/app/rm/TaskSchedulerContextImpl.java | 3 + .../dag/app/rm/TaskSchedulerEventHandler.java | 16 +- .../apache/tez/runtime/task/TezTaskRunner2.java | 2 +- 23 files changed, 754 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index b133ea3..75fac88 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -44,5 +44,6 @@ ALL CHANGES: TEZ-2626. Fix log lines with DEBUG in messages, consolidate TEZ-2003 TODOs. TEZ-2126. Add unit tests for verifying multiple schedulers, launchers, communicators. TEZ-2698. rebase 08/05 + TEZ-2675. Add javadocs for new pluggable components, fix problems reported by jenkins INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7ae5f31..bf2a6cf 100644 --- a/pom.xml +++ b/pom.xml @@ -780,6 +780,7 @@ <configuration> <excludes> <exclude>CHANGES.txt</exclude> + <exclude>TEZ-2003-CHANGES.txt</exclude> </excludes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index 27f0a81..9e7fe51 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -284,7 +284,7 @@ public class TezClient { * Only LocalResourceType.FILE is supported. All files will be treated as * private. * - * @param localFiles + * @param localFiles the files to be made available in the AM */ public synchronized void addAppMasterLocalFiles(Map<String, LocalResource> localFiles) { Preconditions.checkNotNull(localFiles); @@ -314,7 +314,7 @@ public class TezClient { * Master for the next DAG. <br>In session mode, credentials, if needed, must be * set before calling start() * - * @param credentials + * @param credentials credentials */ public synchronized void setAppMasterCredentials(Credentials credentials) { Preconditions @@ -883,6 +883,9 @@ public class TezClient { append(tezDagIdFormat.get().format(1)).toString(); } + /** + * A builder for setting up an instance of {@link org.apache.tez.client.TezClient} + */ @Public public static class TezClientBuilder { final String name; @@ -892,6 +895,15 @@ public class TezClient { private Credentials credentials; ServicePluginsDescriptor servicePluginsDescriptor; + /** + * Create an instance of a TezClientBuilder + * + * @param name + * Name of the client. Used for logging etc. This will also be used + * as app master name is session mode + * @param tezConf + * Configuration for the framework + */ private TezClientBuilder(String name, TezConfiguration tezConf) { this.name = name; this.tezConf = tezConf; @@ -899,26 +911,62 @@ public class TezClient { TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT); } + /** + * Specify whether this client is a session or not + * @param isSession whether the client is a session + * @return the current builder + */ public TezClientBuilder setIsSession(boolean isSession) { this.isSession = isSession; return this; } + /** + * Set local resources to be used by the AppMaster + * + * @param localResources local files for the App Master + * @return the files to be added to the AM + */ public TezClientBuilder setLocalResources(Map<String, LocalResource> localResources) { this.localResourceMap = localResources; return this; } + /** + * Setup security credentials + * + * @param credentials + * Set security credentials to be used inside the app master, if + * needed. Tez App Master needs credentials to access the staging + * directory and for most HDFS cases these are automatically obtained + * by Tez client. If the staging directory is on a file system for + * which credentials cannot be obtained or for any credentials needed + * by user code running inside the App Master, credentials must be + * supplied by the user. These will be used by the App Master for the + * next DAG. <br> + * In session mode, credentials, if needed, must be set before + * calling start() + * @return the current builder + */ public TezClientBuilder setCredentials(Credentials credentials) { this.credentials = credentials; return this; } + /** + * Specify the service plugins that will be running in the AM + * @param servicePluginsDescriptor the service plugin descriptor with details about the plugins running in the AM + * @return the current builder + */ public TezClientBuilder setServicePluginDescriptor(ServicePluginsDescriptor servicePluginsDescriptor) { this.servicePluginsDescriptor = servicePluginsDescriptor; return this; } + /** + * Build the actual instance of the {@link TezClient} + * @return an instance of {@link TezClient} + */ public TezClient build() { return new TezClient(name, tezConf, isSession, localResourceMap, credentials, servicePluginsDescriptor); http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java b/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java index 2eaa7be..b52b08c 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java +++ b/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java @@ -17,6 +17,15 @@ package org.apache.tez.common; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +/** + * Defines a lifecycle for a Service. The typical implementation for services when used within the + * Tez framework would be + * 1. Construct the object. + * 2. initialize() + * 3. start() + * stop() - is invoked when the service is no longer required, and could be invoked while in any + * state, in case of failures + */ @InterfaceAudience.Private @InterfaceStability.Unstable public interface ServicePluginLifecycle { http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java index fce9522..927039a 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -343,7 +343,7 @@ public class DAG { * * @param vertexExecutionContext the default execution context for the DAG * - * @return + * @return this DAG */ @Public @InterfaceStability.Unstable http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java index 17c8c6c..426d4eb 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java @@ -14,9 +14,14 @@ package org.apache.tez.dag.api; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; +@SuppressWarnings("unchecked") public class NamedEntityDescriptor<T extends NamedEntityDescriptor<T>> extends EntityDescriptor<NamedEntityDescriptor<T>> { private final String entityName; @@ -37,6 +42,18 @@ public class NamedEntityDescriptor<T extends NamedEntityDescriptor<T>> extends E } @Override + public void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException( + "write is not expected to be used for a NamedEntityDescriptor"); + } + + @Override + public void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException( + "readFields is not expected to be used for a NamedEntityDescriptor"); + } + + @Override public String toString() { boolean hasPayload = getUserPayload() == null ? false : getUserPayload().getPayload() == null ? false : true; http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java index 8953ae1..3f52a3d 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java @@ -419,13 +419,16 @@ public class Vertex { * * @param vertexExecutionContext the execution context for the vertex. * - * @return + * @return this Vertex */ public Vertex setExecutionContext(VertexExecutionContext vertexExecutionContext) { this.vertexExecutionContext = vertexExecutionContext; return this; } + /** + * The execution context for a running vertex. + */ @Public @InterfaceStability.Unstable public static class VertexExecutionContext { @@ -435,15 +438,39 @@ public class Vertex { final String containerLauncherName; final String taskCommName; + /** + * Create an execution context which specifies whether the vertex needs to be executed in the + * AM + * + * @param executeInAm whether to execute the vertex in the AM + * @return the relevant execution context + */ public static VertexExecutionContext createExecuteInAm(boolean executeInAm) { return new VertexExecutionContext(executeInAm, false); } + /** + * Create an execution context which specifies whether the vertex needs to be executed in + * regular containers + * + * @param executeInContainers whether to execute the vertex in regular containers + * @return the relevant execution context + */ public static VertexExecutionContext createExecuteInContainers(boolean executeInContainers) { return new VertexExecutionContext(false, executeInContainers); } - public static VertexExecutionContext create(String taskSchedulerName, String containerLauncherName, + /** + * @param taskSchedulerName the task scheduler name which was setup while creating the + * {@link org.apache.tez.client.TezClient} + * @param containerLauncherName the container launcher name which was setup while creating the + * {@link org.apache.tez.client.TezClient} + * @param taskCommName the task communicator name which was setup while creating the + * {@link org.apache.tez.client.TezClient} + * @return the relevant execution context + */ + public static VertexExecutionContext create(String taskSchedulerName, + String containerLauncherName, String taskCommName) { return new VertexExecutionContext(taskSchedulerName, containerLauncherName, taskCommName); } @@ -453,12 +480,13 @@ public class Vertex { } private VertexExecutionContext(String taskSchedulerName, String containerLauncherName, - String taskCommName) { + String taskCommName) { this(false, false, taskSchedulerName, containerLauncherName, taskCommName); } - private VertexExecutionContext(boolean executeInAm, boolean executeInContainers, String taskSchedulerName, String containerLauncherName, - String taskCommName) { + private VertexExecutionContext(boolean executeInAm, boolean executeInContainers, + String taskSchedulerName, String containerLauncherName, + String taskCommName) { if (executeInAm || executeInContainers) { Preconditions.checkState(!(executeInAm && executeInContainers), "executeInContainers and executeInAM are mutually exclusive"); http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java index cfd7ca7..f998fa2 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java @@ -22,6 +22,9 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Token; +/** + * Contains specifications for a container which needs to be launched + */ @InterfaceAudience.Public @InterfaceStability.Unstable public class ContainerLaunchRequest extends ContainerLauncherOperationBase { @@ -46,6 +49,10 @@ public class ContainerLaunchRequest extends ContainerLauncherOperationBase { // TODO Post TEZ-2003. TEZ-2625. ContainerLaunchContext needs to be built here instead of being passed in. // Basic specifications need to be provided here + /** + * The {@link ContainerLauncherContext} for the container being launched + * @return the container launch context for the launch request + */ public ContainerLaunchContext getContainerLaunchContext() { return clc; } @@ -53,7 +60,7 @@ public class ContainerLaunchRequest extends ContainerLauncherOperationBase { /** * Get the name of the task communicator which will be used to communicate * with the task that will run in this container. - * @return + * @return the task communicator to be used for this request */ public String getTaskCommunicatorName() { return taskCommName; @@ -61,7 +68,7 @@ public class ContainerLaunchRequest extends ContainerLauncherOperationBase { /** * Get the name of the scheduler which allocated this container. - * @return + * @return the scheduler name which provided the container */ public String getSchedulerName() { return schedulerName; http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java index 7f58f77..5a77b69 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java @@ -33,22 +33,57 @@ public abstract class ContainerLauncher implements ServicePluginLifecycle { this.containerLauncherContext = containerLauncherContext; } + /** + * An entry point for initialization. + * Order of service setup. Constructor, initialize(), start() - when starting a service. + * + * @throws Exception + */ @Override public void initialize() throws Exception { } + /** + * An entry point for starting the service. + * Order of service setup. Constructor, initialize(), start() - when starting a service. + * + * @throws Exception + */ @Override public void start() throws Exception { } + /** + * Stop the service. This could be invoked at any point, when the service is no longer required - + * including in case of errors. + * + * @throws Exception + */ @Override public void shutdown() throws Exception { } + /** + * Get the {@link ContainerLauncherContext} associated with this instance of the container + * launcher, which is used to communicate with the rest of the system + * + * @return an instance of {@link ContainerLauncherContext} + */ public final ContainerLauncherContext getContext() { return this.containerLauncherContext; } + /** + * A request to launch the specified container + * + * @param launchRequest the actual launch request + */ public abstract void launchContainer(ContainerLaunchRequest launchRequest); + + /** + * A request to stop a specific container + * + * @param stopRequest the actual stop request + */ public abstract void stopContainer(ContainerStopRequest stopRequest); } http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java index 5da38b8..dcd9e80 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java @@ -24,30 +24,87 @@ import org.apache.tez.dag.api.UserPayload; @InterfaceStability.Unstable public interface ContainerLauncherContext { - // TODO Post TEZ-2003. Tez abstraction for ContainerId, NodeId, other YARN constructs + // TODO TEZ-2003 (post) TEZ-2664 Tez abstraction for ContainerId, NodeId, other YARN constructs // Reporting APIs + + /** + * Inform the framework that a container has been launched + * + * @param containerId the id of the container that has been launched + */ void containerLaunched(ContainerId containerId); + /** + * Inform the framework of an issue while trying to launch a container. + * + * @param containerId the id of the container which failed to launch + * @param diagnostics diagnostics for the failure + */ void containerLaunchFailed(ContainerId containerId, String diagnostics); + /** + * Inform the framework that a request has been made to stop a container + * + * @param containerId the id of the associated container + */ void containerStopRequested(ContainerId containerId); + /** + * Inform the framework that the attempt to stop a container failed + * + * @param containerId the id of the associated container + * @param diagnostics diagnostics for the failure + */ void containerStopFailed(ContainerId containerId, String diagnostics); - // TODO Post TEZ-2003. TaskAttemptEndReason does not belong here, and is an unnecessary leak. + // TODO TEZ-2003 (post). TEZ-2676 TaskAttemptEndReason does not belong here, and is an unnecessary leak. // ContainerCompleted is normally generated by the scheduler in case of YARN since the RM informs about completion. // For other sources, there may not be a central entity making this information available. The ContainerLauncher // on the stop request will likely be the best place to generate it. - void containerCompleted(ContainerId containerId, int exitStatus, String diagnostics, TaskAttemptEndReason endReason); + + /** + * Inform the scheduler that a container was successfully stopped + * + * @param containerId the id of the associated container + * @param exitStatus the exit status of the container + * @param diagnostics diagnostics associated with the container end + * @param endReason the end reason for the task running in the container + */ + void containerCompleted(ContainerId containerId, int exitStatus, String diagnostics, + TaskAttemptEndReason endReason); // Lookup APIs + /** + * Get the UserPayload that was configured while setting up the launcher + * + * @return the initially configured user payload + */ UserPayload getInitialUserPayload(); + /** + * Get the number of nodes being handled by the specified source + * + * @param sourceName the relevant source name + * @return the initial payload + */ int getNumNodes(String sourceName); + /** + * Get the application attempt id for the running application. Relevant when running under YARN + * + * @return the applicationAttemptId for the running app + */ ApplicationAttemptId getApplicationAttemptId(); + /** + * Get meta info from the specified TaskCommunicator. This assumes that the launched has been + * setup + * along with a compatible TaskCommunicator, and the launcher knows how to read this meta-info + * + * @param taskCommName the name of the task communicator + * @return meta info for the requested task communicator + */ Object getTaskCommunicatorMetaInfo(String taskCommName); } http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java index 29e0420..260b681 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java @@ -24,6 +24,11 @@ import org.apache.hadoop.yarn.api.records.Token; @InterfaceStability.Unstable public class ContainerLauncherOperationBase { + // TODO TEZ-2702 (TEZ-2003 post) + // - Get rid of YARN constructs. + // - ContainerToken may not always be required + + private final NodeId nodeId; private final ContainerId containerId; private final Token containerToken; @@ -36,14 +41,26 @@ public class ContainerLauncherOperationBase { this.containerToken = containerToken; } + /** + * Get the node on whcih this container is to be launched + * @return + */ public NodeId getNodeId() { return nodeId; } + /** + * Get the containerId for the container + * @return + */ public ContainerId getContainerId() { return containerId; } + /** + * Get the security token for the container. Primarily for YARN + * @return + */ public Token getContainerToken() { return containerToken; } http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java index cb0af31..be7d00a 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java @@ -20,6 +20,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Token; +/** + * Contains specifications for a container which needs to be stopped + */ @InterfaceAudience.Public @InterfaceStability.Unstable public class ContainerStopRequest extends ContainerLauncherOperationBase { http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java index 2e4fc46..ce35350 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java @@ -18,6 +18,10 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +/** + * An {@link ServicePluginsDescriptor} describes the list of plugins running within the AM for + * sourcing resources, launching and executing work. + */ @InterfaceAudience.Public @InterfaceStability.Unstable public class ServicePluginsDescriptor { @@ -53,7 +57,7 @@ public class ServicePluginsDescriptor { * @param taskSchedulerDescriptor the task scheduler plugin descriptors * @param containerLauncherDescriptors the container launcher plugin descriptors * @param taskCommunicatorDescriptors the task communicator plugin descriptors - * @return + * @return a {@link ServicePluginsDescriptor} instance */ public static ServicePluginsDescriptor create(TaskSchedulerDescriptor[] taskSchedulerDescriptor, ContainerLauncherDescriptor[] containerLauncherDescriptors, @@ -69,7 +73,7 @@ public class ServicePluginsDescriptor { * @param taskSchedulerDescriptor the task scheduler plugin descriptors * @param containerLauncherDescriptors the container launcher plugin descriptors * @param taskCommunicatorDescriptors the task communicator plugin descriptors - * @return + * @return a {@link ServicePluginsDescriptor} instance */ public static ServicePluginsDescriptor create(boolean enableUber, TaskSchedulerDescriptor[] taskSchedulerDescriptor, @@ -88,7 +92,7 @@ public class ServicePluginsDescriptor { * @param taskSchedulerDescriptor the task scheduler plugin descriptors * @param containerLauncherDescriptors the container launcher plugin descriptors * @param taskCommunicatorDescriptors the task communicator plugin descriptors - * @return + * @return a {@link ServicePluginsDescriptor} instance */ public static ServicePluginsDescriptor create(boolean enableContainers, boolean enableUber, TaskSchedulerDescriptor[] taskSchedulerDescriptor, @@ -103,30 +107,35 @@ public class ServicePluginsDescriptor { * execution is enabled by default * * @param enableUber whether to enable execution in the AM or not - * @return + * @return a {@link ServicePluginsDescriptor} instance */ public static ServicePluginsDescriptor create(boolean enableUber) { return new ServicePluginsDescriptor(true, enableUber, null, null, null); } + @InterfaceAudience.Private public boolean areContainersEnabled() { return enableContainers; } + @InterfaceAudience.Private public boolean isUberEnabled() { return enableUber; } + @InterfaceAudience.Private public TaskSchedulerDescriptor[] getTaskSchedulerDescriptors() { return taskSchedulerDescriptors; } + @InterfaceAudience.Private public ContainerLauncherDescriptor[] getContainerLauncherDescriptors() { return containerLauncherDescriptors; } + @InterfaceAudience.Private public TaskCommunicatorDescriptor[] getTaskCommunicatorDescriptors() { return taskCommunicatorDescriptors; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java index b1fb349..de76029 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java @@ -14,23 +14,34 @@ package org.apache.tez.serviceplugins.api; +import javax.annotation.Nullable; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.common.ServicePluginLifecycle; +/** + * This class represents the API for a custom TaskScheduler which can be run within the Tez AM. + * This can be used to source resources from different sources, as well as control the logic of + * how these resources get allocated to the different tasks within a DAG which needs resources. + * <p/> + * The plugin is initialized with an instance of {@link TaskSchedulerContext} - which provides + * a mechanism to notify the system about allocation decisions and resources to the Tez framework. + */ @InterfaceAudience.Public @InterfaceStability.Unstable public abstract class TaskScheduler implements ServicePluginLifecycle { // TODO TEZ-2003 (post) TEZ-2668 // - Should setRegister / unregister be part of APIs when not YARN specific ? - // - Include vertex / task information in therequest so that the scheduler can make decisions + // - Include vertex / task information in the request so that the scheduler can make decisions // around prioritizing tasks in the same vertex when others exist at the same priority. + // There should be an interface around Object task - if it's meant to be used for equals / hashCode. private final TaskSchedulerContext taskSchedulerContext; @@ -38,55 +49,179 @@ public abstract class TaskScheduler implements ServicePluginLifecycle { this.taskSchedulerContext = taskSchedulerContext; } + /** + * An entry point for initialization. + * Order of service setup. Constructor, initialize(), start() - when starting a service. + * + * @throws Exception + */ @Override public void initialize() throws Exception { } + /** + * An entry point for starting the service. + * Order of service setup. Constructor, initialize(), start() - when starting a service. + * + * @throws Exception + */ @Override public void start() throws Exception { } + /** + * Stop the service. This could be invoked at any point, when the service is no longer required - + * including in case of errors. + * + * @throws Exception + */ @Override public void shutdown() throws Exception { } + /** + * The first step of stopping the task scheduler service. This would typically be used to stop + * allocating new resources. shutdown() will typically be used to unregister from external + * services - especially YARN for instance, so that the app is not killed + */ public void initiateStop() { } - public abstract Resource getAvailableResources(); - - public abstract int getClusterNodeCount(); + /** + * Get the {@link TaskSchedulerContext} associated with this instance of the scheduler, which is + * used to communicate with the rest of the system + * + * @return an instance of {@link TaskSchedulerContext} + */ + public final TaskSchedulerContext getContext() { + return taskSchedulerContext; + } - public abstract void dagComplete(); + /** + * Get the currently available resources from this source + * + * @return the resources available at the time of invocation + */ + public abstract Resource getAvailableResources(); + /** + * Get the total available resources from this source + * + * @return the total available resources from the source + */ public abstract Resource getTotalResources(); + /** + * Get the number of nodes available from the source + * + * @return the number of nodes + */ + public abstract int getClusterNodeCount(); + + /** + * Indication to a source that a node has been blacklisted, and should not be used for subsequent + * allocations. + * + * @param nodeId te nodeId to be blacklisted + */ public abstract void blacklistNode(NodeId nodeId); + /** + * Indication to a source that a node has been un-blacklisted, and can be used from subsequent + * allocations + * + * @param nodeId the nodeId to be unblacklisted + */ public abstract void unblacklistNode(NodeId nodeId); + /** + * A request to the source to allocate resources for a requesting task, with location information + * optionally specified + * + * @param task the task for which resources are being accepted. + * @param capability the required resources to run this task + * @param hosts the preferred host locations for the task + * @param racks the preferred rack locations for the task + * @param priority the priority of the request for this allocation. A lower value + * implies a higher priority + * @param containerSignature the specifications for the container (environment, etc) which will + * be + * used for this task - if applicable + * @param clientCookie a cookie associated with this request. This should be returned back + * via the {@link TaskSchedulerContext#taskAllocated(Object, Object, + * Container)} method when a task is assigned to a resource + */ public abstract void allocateTask(Object task, Resource capability, String[] hosts, String[] racks, Priority priority, Object containerSignature, Object clientCookie); /** - * Allocate affinitized to a specific container + * A request to the source to allocate resources for a requesting task, based on a previously used + * container + * + * @param task the task for which resources are being accepted. + * @param capability the required resources to run this task + * @param containerId a previous container which is used as an indication as to where this + * task should be placed + * @param priority the priority of the request for this allocation. A lower value + * implies a higher priority + * @param containerSignature the specifications for the container (environment, etc) which will + * be + * used for this task - if applicable + * @param clientCookie a cookie associated with this request. This should be returned back + * via the {@link TaskSchedulerContext#taskAllocated(Object, Object, + * Container)} method when a task is assigned to a resource */ public abstract void allocateTask(Object task, Resource capability, - ContainerId containerId, Priority priority, Object containerSignature, + ContainerId containerId, Priority priority, + Object containerSignature, Object clientCookie); - /** Plugin writers must ensure to de-allocate a container once it's done, so that it can be collected. */ - public abstract boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason); + /** + * A request to deallocate a task. This is typically a result of a task completing - with success + * or failure. It could also be the result of a decision to not run the task, before it is + * allocated or started. + * <p/> + * Plugin writers need to de-allocate containers via the context once it's no longer required, for + * correct book-keeping + * + * @param task the task being de-allocated. + * @param taskSucceeded whether the task succeeded or not + * @param endReason the reason for the task failure + * @param diagnostics additional diagnostics information which may be relevant + * @return true if the task was associated with a container, false if the task was not associated + * with a container + */ + public abstract boolean deallocateTask(Object task, boolean taskSucceeded, + TaskAttemptEndReason endReason, + @Nullable String diagnostics); + /** + * A request to de-allocate a previously allocated container. + * + * @param containerId the containerId to de-allocate + * @return the task which was previously associated with this container, null otherwise + */ public abstract Object deallocateContainer(ContainerId containerId); + /** + * Inform the scheduler that it should unregister. This is primarily valid for schedulers which + * require registration (YARN a.t.m) + */ public abstract void setShouldUnregister(); + /** + * Checks with the scheduler whether it has unregistered. + * + * @return true if the scheduler has unregistered. False otherwise. + */ public abstract boolean hasUnregistered(); + /** + * Indicates to the scheduler that the currently running dag has completed. + * This can be used to reset dag specific statistics, potentially release resources and prepare + * for a new DAG. + */ + public abstract void dagComplete(); - public final TaskSchedulerContext getContext() { - return taskSchedulerContext; - } } http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java index dbbf75c..a24061f 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java @@ -31,15 +31,24 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.common.ContainerSignatureMatcher; import org.apache.tez.dag.api.UserPayload; +/** + * Context for a {@link TaskScheduler} + * <p/> + * This provides methods for a scheduler to interact with the Tez framework. + * <p/> + * Calls into this should be outside of locks, which may also be obtained by methods in the + * scheduler + * which implement the {@link TaskScheduler} interface + */ @InterfaceAudience.Public @InterfaceStability.Unstable - public interface TaskSchedulerContext { - public class AppFinalStatus { + class AppFinalStatus { public final FinalApplicationStatus exitStatus; public final String exitMessage; public final String postCompletionTrackingUrl; + public AppFinalStatus(FinalApplicationStatus exitStatus, String exitMessage, String posCompletionTrackingUrl) { @@ -49,67 +58,180 @@ public interface TaskSchedulerContext { } } + /** + * Indicates the state the AM is in. + */ enum AMState { - IDLE, RUNNING_APP, COMPLETED + IDLE, + RUNNING_APP, + COMPLETED } // TODO TEZ-2003 (post) TEZ-2664. Remove references to YARN constructs like Container, ContainerStatus, NodeReport // TODO TEZ-2003 (post) TEZ-2668 Enhancements to TaskScheduler interfaces // - setApplicationRegistrationData may not be relevant to non YARN clusters // - getAppFinalStatus may not be relevant to non YARN clusters - // upcall to app must be outside locks - public void taskAllocated(Object task, - Object appCookie, - Container container); - // this may end up being called for a task+container pair that the app - // has not heard about. this can happen because of a race between - // taskAllocated() upcall and deallocateTask() downcall - public void containerCompleted(Object taskLastAllocated, - ContainerStatus containerStatus); - public void containerBeingReleased(ContainerId containerId); - public void nodesUpdated(List<NodeReport> updatedNodes); - public void appShutdownRequested(); - - // TODO Post TEZ-2003, this method specifically needs some cleaning up. - // ClientAMSecretKey is only relevant when running under YARN. As are ApplicationACLs. - public void setApplicationRegistrationData( + + + /** + * Indicate to the framework that a container is being assigned to a task. + * + * @param task the task for which a container is being assigned. This should be the same + * instance that was provided when requesting for an allocation + * @param appCookie the cookie which was provided while requesting allocation for this task + * @param container the actual container assigned to the task + */ + void taskAllocated(Object task, + Object appCookie, + Container container); + + + /** + * Indicate to the framework that a container has completed. This is typically used by sources + * which have + * a means to indicate a container failure to the scheduler (typically centrally managed + * schedulers - YARN) + * + * @param taskLastAllocated the task that was allocated to this container, if any. This is the + * same instance that was passed in while requesting an allocation + * @param containerStatus the status with which the container ended + */ + void containerCompleted(Object taskLastAllocated, + ContainerStatus containerStatus); + + /** + * Indicates to the framework that a container is being released. + * + * @param containerId the id of the container being released + */ + void containerBeingReleased(ContainerId containerId); + + + /** + * Provide an update to the framework about the status of nodes available to this report + * + * @param updatedNodes a list of updated node reports + */ + void nodesUpdated(List<NodeReport> updatedNodes); + + /** + * Inform the framework that an app shutdown is required. This should typically not be used, other + * than + * by the YARN scheduler. + */ + void appShutdownRequested(); + + /** + * Provide an update to the framework about specific information about the source managed by this + * scheduler. + * + * @param maxContainerCapability the total resource capability of the source + * @param appAcls ACLs for the source + * @param clientAMSecretKey a secret key provided by the source + */ + void setApplicationRegistrationData( Resource maxContainerCapability, Map<ApplicationAccessType, String> appAcls, ByteBuffer clientAMSecretKey ); - public void onError(Throwable t); - public float getProgress(); - public void preemptContainer(ContainerId containerId); - public AppFinalStatus getFinalAppStatus(); + /** + * Indicate to the framework that the scheduler has run into an error. This will cause + * the DAG and application to be killed. + * + * @param t the relevant error + */ + void onError(Throwable t); + + /** + * Inform the framework that the scheduler has determined that a previously allocated container + * needs to be preempted + * + * @param containerId the containerId to be preempted + */ + void preemptContainer(ContainerId containerId); + + /** + * Get the final status for the application, which could be provided to the coordinator of the + * source. + * Primarily relevant to YARN + * + * @return the final Application status + */ + AppFinalStatus getFinalAppStatus(); // Getters - public UserPayload getInitialUserPayload(); + /** + * Get the UserPayload that was configured while setting up the scheduler + * + * @return the initially configured user payload + */ + UserPayload getInitialUserPayload(); - public String getAppTrackingUrl(); + /** + * Get the tracking URL for the application. Primarily relevant to YARN + * + * @return the trackingUrl for the app + */ + String getAppTrackingUrl(); + + /** + * Request the framework for progress of the running DAG. This value must be between 0 and 1 + * + * @return progress + */ + float getProgress(); /** * A custom cluster identifier allocated to schedulers to generate an AppId, if not making * use of YARN - * @return + * + * @return the custom cluster identifier */ - public long getCustomClusterIdentifier(); + long getCustomClusterIdentifier(); - public ContainerSignatureMatcher getContainerSignatureMatcher(); + /** + * Get an instance of {@link ContainerSignatureMatcher} which can be used to check whether the + * specifications of a container match what is required by a task. + * + * @return an instance of {@link ContainerSignatureMatcher} + */ + ContainerSignatureMatcher getContainerSignatureMatcher(); /** * Get the application attempt id for the running application. Relevant when running under YARN - * @return + * + * @return the applicationAttemptId for the running app */ - public ApplicationAttemptId getApplicationAttemptId(); + ApplicationAttemptId getApplicationAttemptId(); - public String getAppHostName(); + /** + * Get the hostname on which the app is running + * + * @return the hostname + */ + String getAppHostName(); - public int getAppClientPort(); + /** + * Get the port on which the DAG client is listening + * + * @return the client port + */ + int getAppClientPort(); - public boolean isSession(); + /** + * Check whether the AM is running in session mode. + * + * @return true if session mode, false otherwise + */ + boolean isSession(); - public AMState getAMState(); + /** + * Get the state of the AppMaster + * + * @return the app master state + */ + AMState getAMState(); } http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java index 794d390..4fc541c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java @@ -29,6 +29,19 @@ import org.apache.tez.runtime.api.impl.TaskSpec; // TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module // TODO TEZ-2003 (post) TEZ-2664. Ideally, don't expose YARN containerId; instead expose a Tez specific construct. + +/** + * This class represents the API for a custom TaskCommunicator which can be run within the Tez AM. + * This is used to communicate with running services, potentially launching tasks, and getting + * updates from running tasks. + * <p/> + * The plugin is initialized with an instance of {@link TaskCommunicatorContext} - which provides + * a mechanism to notify the system about allocation decisions and resources to the Tez framework. + * + * If setting up a heartbeat between the task and the AM, the framework is responsible for error checking + * of this heartbeat mechanism, handling lost or duplicate responses. + * + */ public abstract class TaskCommunicator implements ServicePluginLifecycle { // TODO TEZ-2003 (post) TEZ-2666 Enhancements to interface @@ -45,34 +58,100 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle { this.taskCommunicatorContext = taskCommunicatorContext; } + /** + * Get the {@link TaskCommunicatorContext} associated with this instance of the scheduler, which + * is + * used to communicate with the rest of the system + * + * @return an instance of {@link TaskCommunicatorContext} + */ public TaskCommunicatorContext getContext() { return taskCommunicatorContext; } + /** + * An entry point for initialization. + * Order of service setup. Constructor, initialize(), start() - when starting a service. + * + * @throws Exception + */ @Override public void initialize() throws Exception { } + /** + * An entry point for starting the service. + * Order of service setup. Constructor, initialize(), start() - when starting a service. + * + * @throws Exception + */ @Override public void start() throws Exception { } + /** + * Stop the service. This could be invoked at any point, when the service is no longer required - + * including in case of errors. + * + * @throws Exception + */ @Override public void shutdown() throws Exception { } + /** + * Register a new container. + * + * @param containerId the associated containerId + * @param hostname the hostname on which the container runs + * @param port the port for the service which is running the container + */ public abstract void registerRunningContainer(ContainerId containerId, String hostname, int port); + /** + * Register the end of a container. This can be caused by preemption, the container completing + * successfully, etc. + * + * @param containerId the associated containerId + * @param endReason the end reason for the container completing + */ public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason); + /** + * Register a task attempt to execute on a container + * + * @param containerId the containerId on which this task needs to run + * @param taskSpec the task specifications for the task to be executed + * @param additionalResources additional local resources which may be required to run this task + * on + * the container + * @param credentials the credentials required to run this task + * @param credentialsChanged whether the credentials are different from the original credentials + * associated with this container + * @param priority the priority of the task being executed + */ public abstract void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec, Map<String, LocalResource> additionalResources, Credentials credentials, boolean credentialsChanged, int priority); - public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason); + /** + * Register the completion of a task. This may be a result of preemption, the container dying, + * the + * node dying, the task completing to success + * + * @param taskAttemptID the task attempt which has completed / needs to be completed + * @param endReason the endReason for the task attempt. + */ + public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, + TaskAttemptEndReason endReason); + /** + * Return the address, if any, that the service listens on + * + * @return the address + */ public abstract InetSocketAddress getAddress(); /** @@ -82,11 +161,13 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle { * org.apache.tez.runtime.api.InputInitializerContext#registerForVertexStateUpdates(String, * java.util.Set)}. Notifications will be received for all registered state changes, and not just * for the latest state update. They will be in order in which the state change occurred. </p> - * + * <p/> * Extensive processing should not be performed via this method call. Instead this should just be * used as a notification mechanism. - * <br>This method may be invoked concurrently with other invocations into the TaskCommunicator and + * <br>This method may be invoked concurrently with other invocations into the TaskCommunicator + * and * multi-threading/concurrency implications must be considered. + * * @param stateUpdate an event indicating the name of the vertex, and it's updated state. * Additional information may be available for specific events, Look at the * type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate} @@ -97,16 +178,18 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle { /** * Indicates the current running dag is complete. The TaskCommunicatorContext can be used to * query information about the current dag during the duration of the dagComplete invocation. - * + * <p/> * After this, the contents returned from querying the context may change at any point - due to * the next dag being submitted. */ public abstract void dagComplete(String dagName); /** - * Share meta-information such as host:port information where the Task Communicator may be listening. + * Share meta-information such as host:port information where the Task Communicator may be + * listening. * Primarily for use by compatible launchers to learn this information. - * @return + * + * @return meta info for the task communicator */ public abstract Object getMetaInfo(); } http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java index 8073f6a..0a684e7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java @@ -42,31 +42,112 @@ public interface TaskCommunicatorContext { // - Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc. // - Handling of containres / tasks which no longer exist in the system (formalized interface instead of a shouldDie notification) + /** + * Get the UserPayload that was configured while setting up the task communicator + * + * @return the initially configured user payload + */ UserPayload getInitialUserPayload(); + /** + * Get the application attempt id for the running application. Relevant when running under YARN + * + * @return the applicationAttemptId for the running app + */ ApplicationAttemptId getApplicationAttemptId(); + + /** + * Get credentials associated with the AppMaster + * + * @return credentials + */ Credentials getCredentials(); + /** + * Check whether a running attempt can commit. This provides a leader election mechanism amongst + * multiple running attempts + * + * @param taskAttemptId the associated task attempt id + * @return whether the attempt can commit or not + * @throws IOException + */ boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException; + /** + * Mechanism for a {@link TaskCommunicator} to provide updates on a running task, as well as + * receive new information which may need to be propagated to the task. This includes events + * generated by the task and events which need to be sent to the task + * This method must be invoked periodically to receive updates for a running task + * + * @param request the update from the running task. + * @return the response that is requried by the task. + * @throws IOException + * @throws TezException + */ TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException; + /** + * Check whether the container is known by the framework. The state of this container is + * irrelevant + * + * @param containerId the relevant container id + * @return true if the container is known, false if it isn't + */ boolean isKnownContainer(ContainerId containerId); + /** + * Inform the framework that a task is alive. This needs to be invoked periodically to avoid the + * task attempt timing out. + * Invocations to heartbeat provides the same keep-alive functionality + * + * @param taskAttemptId the relevant task attempt + */ void taskAlive(TezTaskAttemptID taskAttemptId); + /** + * Inform the framework that a container is alive. This need to be invoked periodically to avoid + * the container attempt timing out. + * Invocations to heartbeat provides the same keep-alive functionality + * + * @param containerId the relevant container id + */ void containerAlive(ContainerId containerId); + /** + * Inform the framework that the task has started execution + * + * @param taskAttemptId the relevant task attempt id + * @param containerId the containerId in which the task attempt is running + */ void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId); - void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics); + /** + * Inform the framework that a task has been killed + * + * @param taskAttemptId the relevant task attempt id + * @param taskAttemptEndReason the reason for the task attempt being killed + * @param diagnostics any diagnostics messages which are relevant to the task attempt + * kill + */ + void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, + @Nullable String diagnostics); - void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics); + /** + * Inform the framework that a task has failed + * + * @param taskAttemptId the relevant task attempt id + * @param taskAttemptEndReason the reason for the task failure + * @param diagnostics any diagnostics messages which are relevant to the task attempt + * failure + */ + void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, + @Nullable String diagnostics); /** * Register to get notifications on updates to the specified vertex. Notifications will be sent - * via {@link org.apache.tez.runtime.api.InputInitializer#onVertexStateUpdated(org.apache.tez.dag.api.event.VertexStateUpdate)} </p> - * + * via {@link org.apache.tez.runtime.api.InputInitializer#onVertexStateUpdated(org.apache.tez.dag.api.event.VertexStateUpdate)} + * </p> + * <p/> * This method can only be invoked once. Duplicate invocations will result in an error. * * @param vertexName the vertex name for which notifications are required. @@ -76,6 +157,7 @@ public interface TaskCommunicatorContext { /** * Get the name of the currently executing dag + * * @return the name of the currently executing dag */ String getCurretnDagName(); @@ -83,6 +165,7 @@ public interface TaskCommunicatorContext { /** * Get the name of the Input vertices for the specified vertex. * Root Inputs are not returned. + * * @param vertexName the vertex for which source vertex names will be returned * @return an Iterable containing the list of input vertices for the specified vertex */ @@ -90,13 +173,15 @@ public interface TaskCommunicatorContext { /** * Get the total number of tasks in the given vertex - * @param vertexName + * + * @param vertexName the relevant vertex name * @return total number of tasks in this vertex */ int getVertexTotalTaskCount(String vertexName); /** * Get the number of completed tasks for a given vertex + * * @param vertexName the vertex name * @return the number of completed tasks for the vertex */ @@ -104,6 +189,7 @@ public interface TaskCommunicatorContext { /** * Get the number of running tasks for a given vertex + * * @param vertexName the vertex name * @return the number of running tasks for the vertex */ @@ -111,14 +197,16 @@ public interface TaskCommunicatorContext { /** * Get the start time for the first attempt of the specified task + * * @param vertexName the vertex to which the task belongs - * @param taskIndex the index of the task + * @param taskIndex the index of the task * @return the start time for the first attempt of the task */ long getFirstAttemptStartTime(String vertexName, int taskIndex); /** * Get the start time for the currently executing DAG + * * @return time when the current dag started executing */ long getDagStartTime(); http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java index 92bbbdc..3a2efc5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java @@ -29,6 +29,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventType; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.events.ContainerLaunchedEvent; +@SuppressWarnings("unchecked") public class ContainerLauncherContextImpl implements ContainerLauncherContext { private final AppContext context; http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index 6c1dad9..ad6f2c4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -178,13 +178,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements taskCommClazz.getConstructor(TaskCommunicatorContext.class); ctor.setAccessible(true); return ctor.newInstance(taskCommunicatorContext); - } catch (NoSuchMethodException e) { - throw new TezUncheckedException(e); - } catch (InvocationTargetException e) { - throw new TezUncheckedException(e); - } catch (InstantiationException e) { - throw new TezUncheckedException(e); - } catch (IllegalAccessException e) { + } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { throw new TezUncheckedException(e); } } @@ -398,13 +392,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements containerInfo.taskAttemptId); } - if (containerInfo.taskAttemptId != null) { - throw new TezUncheckedException("Registering task attempt: " - + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId - + " with existing assignment to: " + - containerInfo.taskAttemptId); - } - // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map. registeredContainers.put(containerId, new ContainerInfo(amContainerTask.getTask().getTaskAttemptID())); http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java index 57b4aee..d0cee21 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java @@ -62,7 +62,7 @@ public class ContainerLauncherRouter extends AbstractService containerLaunchers = new ContainerLauncher[] {containerLauncher}; containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()}; containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[]{ - new ServicePluginLifecycleAbstractService(containerLauncher)}; + new ServicePluginLifecycleAbstractService<>(containerLauncher)}; } // Accepting conf to setup final parameters, if required. @@ -89,7 +89,7 @@ public class ContainerLauncherRouter extends AbstractService containerLauncherContexts[i] = containerLauncherContext; containerLaunchers[i] = createContainerLauncher(containerLauncherDescriptors.get(i), context, containerLauncherContext, taskAttemptListener, workingDirectory, i, isPureLocalMode); - containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService(containerLaunchers[i]); + containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(containerLaunchers[i]); } } @@ -138,6 +138,7 @@ public class ContainerLauncherRouter extends AbstractService } @VisibleForTesting + @SuppressWarnings("unchecked") ContainerLauncher createCustomContainerLauncher(ContainerLauncherContext containerLauncherContext, NamedEntityDescriptor containerLauncherDescriptor) { LOG.info("Creating container launcher {}:{} ", containerLauncherDescriptor.getEntityName(), @@ -150,15 +151,10 @@ public class ContainerLauncherRouter extends AbstractService .getConstructor(ContainerLauncherContext.class); ctor.setAccessible(true); return ctor.newInstance(containerLauncherContext); - } catch (NoSuchMethodException e) { - throw new TezUncheckedException(e); - } catch (InvocationTargetException e) { - throw new TezUncheckedException(e); - } catch (InstantiationException e) { - throw new TezUncheckedException(e); - } catch (IllegalAccessException e) { + } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { throw new TezUncheckedException(e); } + } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java index 7f1d5a3..2a9797f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java @@ -57,6 +57,9 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext { } + // this may end up being called for a task+container pair that the app + // has not heard about. this can happen because of a race between + // taskAllocated() upcall and deallocateTask() downcall @Override public void taskAllocated(Object task, Object appCookie, Container container) { tseh.taskAllocated(schedulerId, task, appCookie, container); http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index 4d710fa..0f19379 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -144,6 +144,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements * @param webUI * @param schedulerDescriptors the list of scheduler descriptors. Tez internal classes will not have the class names populated. * An empty list defaults to using the YarnTaskScheduler as the only source. + * @param isPureLocalMode whether the AM is running in local mode */ @SuppressWarnings("rawtypes") public TaskSchedulerEventHandler(AppContext appContext, @@ -423,6 +424,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements return new LocalTaskSchedulerService(taskSchedulerContext); } + @SuppressWarnings("unchecked") TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext, NamedEntityDescriptor taskSchedulerDescriptor, int schedulerId) { @@ -436,13 +438,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements .getConstructor(TaskSchedulerContext.class); ctor.setAccessible(true); return ctor.newInstance(taskSchedulerContext); - } catch (NoSuchMethodException e) { - throw new TezUncheckedException(e); - } catch (InvocationTargetException e) { - throw new TezUncheckedException(e); - } catch (InstantiationException e) { - throw new TezUncheckedException(e); - } catch (IllegalAccessException e) { + } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { throw new TezUncheckedException(e); } } @@ -453,7 +449,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements int j = 0; for (int i = 0; i < taskSchedulerDescriptors.length; i++) { long customAppIdIdentifier; - if (isPureLocalMode || taskSchedulerDescriptors[i].equals( + if (isPureLocalMode || taskSchedulerDescriptors[i].getEntityName().equals( TezConstants.getTezYarnServicePluginName())) { // Use the app identifier from the appId. customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp(); } else { @@ -463,7 +459,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements customAppIdIdentifier); taskSchedulers[i] = createTaskScheduler(host, port, trackingUrl, appContext, taskSchedulerDescriptors[i], customAppIdIdentifier, i); - taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskSchedulers[i]); + taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[i]); } } @@ -745,7 +741,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements public boolean hasUnregistered() { boolean result = true; for (int i = 0 ; i < taskSchedulers.length ; i++) { - result |= this.taskSchedulers[i].hasUnregistered(); + result = result & this.taskSchedulers[i].hasUnregistered(); if (result == false) { return result; } http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java index 1a8828d..d8539c5 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java @@ -122,7 +122,7 @@ public class TezTaskRunner2 { * the AM - since a task KILL is an external event, and whoever invoked it should * be able to track it. * - * @return + * @return the taskRunner result */ public TaskRunner2Result run() { try {
