TEZ-3024. Move TaskCommunicator to correct package. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fad16425 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fad16425 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fad16425 Branch: refs/heads/TEZ-2980 Commit: fad16425b7979970f29884343316e1a10f4d0872 Parents: 1d76543 Author: Siddharth Seth <[email protected]> Authored: Tue Jan 12 17:32:28 2016 -0800 Committer: Siddharth Seth <[email protected]> Committed: Tue Jan 12 17:32:28 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/api/TaskCommunicator.java | 221 ----------------- .../tez/dag/api/TaskCommunicatorContext.java | 225 ----------------- .../tez/dag/api/TaskHeartbeatRequest.java | 68 ------ .../tez/dag/api/TaskHeartbeatResponse.java | 51 ---- .../dag/app/TaskCommunicatorContextImpl.java | 7 +- .../tez/dag/app/TaskCommunicatorManager.java | 8 +- .../tez/dag/app/TaskCommunicatorWrapper.java | 2 +- .../dag/app/TezLocalTaskCommunicatorImpl.java | 2 +- .../tez/dag/app/TezTaskCommunicatorImpl.java | 8 +- .../serviceplugins/api/TaskCommunicator.java | 232 ++++++++++++++++++ .../api/TaskCommunicatorContext.java | 240 +++++++++++++++++++ .../api/TaskHeartbeatRequest.java | 82 +++++++ .../api/TaskHeartbeatResponse.java | 65 +++++ .../app/TestTaskCommunicatorContextImpl.java | 2 +- .../dag/app/TestTaskCommunicatorManager.java | 4 +- .../dag/app/TestTaskCommunicatorManager1.java | 8 +- .../dag/app/TestTaskCommunicatorWrapper.java | 2 +- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 2 +- .../dag/app/rm/TestTaskSchedulerManager.java | 4 - .../dag/app/rm/container/TestAMContainer.java | 2 +- .../app/rm/container/TestAMContainerMap.java | 2 +- .../TezTestServiceTaskCommunicatorImpl.java | 2 +- ...ezTestServiceTaskCommunicatorWithErrors.java | 4 +- 24 files changed, 647 insertions(+), 597 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d39cbb8..0645591 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,7 @@ Apache Tez Change Log Release 0.8.2: Unreleased INCOMPATIBLE CHANGES + TEZ-3024. Move TaskCommunicator to correct package. TEZ-2679. Admin forms of launch env settings TEZ-2948. Stop using dagName in the dagComplete notification to TaskCommunicators. TEZ-2949. Allow duplicate dag names within session for Tez. http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java deleted file mode 100644 index 1b6ad07..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.api; - -import javax.annotation.Nullable; -import java.net.InetSocketAddress; -import java.util.Map; - -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.tez.common.ServicePluginLifecycle; -import org.apache.tez.dag.api.event.VertexStateUpdate; -import org.apache.tez.serviceplugins.api.ContainerEndReason; -import org.apache.tez.serviceplugins.api.ServicePluginException; -import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.runtime.api.impl.TaskSpec; - -// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module -// TODO TEZ-2003 (post) TEZ-2664. Ideally, don't expose YARN containerId; instead expose a Tez specific construct. - -/** - * This class represents the API for a custom TaskCommunicator which can be run within the Tez AM. - * This is used to communicate with running services, potentially launching tasks, and getting - * updates from running tasks. - * <p/> - * The plugin is initialized with an instance of {@link TaskCommunicatorContext} - which provides - * a mechanism to notify the system about allocation decisions and resources to the Tez framework. - * - * If setting up a heartbeat between the task and the AM, the framework is responsible for error checking - * of this heartbeat mechanism, handling lost or duplicate responses. - * - */ -public abstract class TaskCommunicator implements ServicePluginLifecycle { - - // TODO TEZ-2003 (post) TEZ-2666 Enhancements to interface - // - registerContainerEnd should provide the end reason / possible rename - // - get rid of getAddress - // - Add methods to support task preemption - // - Add a dagStarted notification, along with a payload - // - taskSpec breakup into a clean interface - // - Add methods to report task / container completion - - private final TaskCommunicatorContext taskCommunicatorContext; - - public TaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) { - this.taskCommunicatorContext = taskCommunicatorContext; - } - - /** - * Get the {@link TaskCommunicatorContext} associated with this instance of the scheduler, which - * is - * used to communicate with the rest of the system - * - * @return an instance of {@link TaskCommunicatorContext} - */ - public TaskCommunicatorContext getContext() { - return taskCommunicatorContext; - } - - /** - * An entry point for initialization. - * Order of service setup. Constructor, initialize(), start() - when starting a service. - * - * @throws Exception - */ - @Override - public void initialize() throws Exception { - } - - /** - * An entry point for starting the service. - * Order of service setup. Constructor, initialize(), start() - when starting a service. - * - * @throws Exception - */ - @Override - public void start() throws Exception { - } - - /** - * Stop the service. This could be invoked at any point, when the service is no longer required - - * including in case of errors. - * - * @throws Exception - */ - @Override - public void shutdown() throws Exception { - } - - - /** - * Register a new container. - * - * @param containerId the associated containerId - * @param hostname the hostname on which the container runs - * @param port the port for the service which is running the container - * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. - * This will cause the app to shutdown. - */ - public abstract void registerRunningContainer(ContainerId containerId, String hostname, - int port) throws ServicePluginException; - - /** - * Register the end of a container. This can be caused by preemption, the container completing - * successfully, etc. - * - * @param containerId the associated containerId - * @param endReason the end reason for the container completing - * @param diagnostics diagnostics associated with the container end - * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. - * This will cause the app to shutdown. - */ - public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, - @Nullable String diagnostics) throws - ServicePluginException; - - /** - * Register a task attempt to execute on a container - * - * @param containerId the containerId on which this task needs to run - * @param taskSpec the task specifications for the task to be executed - * @param additionalResources additional local resources which may be required to run this task - * on - * the container - * @param credentials the credentials required to run this task - * @param credentialsChanged whether the credentials are different from the original credentials - * associated with this container - * @param priority the priority of the task being executed - * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. - * This will cause the app to shutdown. - */ - public abstract void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec, - Map<String, LocalResource> additionalResources, - Credentials credentials, - boolean credentialsChanged, int priority) throws - ServicePluginException; - - /** - * Register the completion of a task. This may be a result of preemption, the container dying, - * the node dying, the task completing to success - * - * @param taskAttemptID the task attempt which has completed / needs to be completed - * @param endReason the endReason for the task attempt. - * @param diagnostics diagnostics associated with the task end - * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. - * This will cause the app to shutdown. - */ - public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, - TaskAttemptEndReason endReason, - @Nullable String diagnostics) throws - ServicePluginException; - - /** - * Return the address, if any, that the service listens on - * - * @return the address - * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. - * This will cause the app to shutdown. - */ - public abstract InetSocketAddress getAddress() throws ServicePluginException; - - /** - * Receive notifications on vertex state changes. - * <p/> - * State changes will be received based on the registration via {@link - * org.apache.tez.runtime.api.InputInitializerContext#registerForVertexStateUpdates(String, - * java.util.Set)}. Notifications will be received for all registered state changes, and not just - * for the latest state update. They will be in order in which the state change occurred. </p> - * <p/> - * Extensive processing should not be performed via this method call. Instead this should just be - * used as a notification mechanism. - * <br>This method may be invoked concurrently with other invocations into the TaskCommunicator - * and - * multi-threading/concurrency implications must be considered. - * - * @param stateUpdate an event indicating the name of the vertex, and it's updated state. - * Additional information may be available for specific events, Look at the - * type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate} - * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. - * This will cause the app to shutdown. - */ - public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws ServicePluginException; - - /** - * Indicates the current running dag is complete. The TaskCommunicatorContext can be used to - * query information about the current dag during the duration of the dagComplete invocation. - * <p/> - * After this, the contents returned from querying the context may change at any point - due to - * the next dag being submitted. - * - * @param dagIdentifier the unique numerical identifier for the DAG in the specified execution context. - * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. - * This will cause the app to shutdown. - */ - public abstract void dagComplete(int dagIdentifier) throws ServicePluginException; - - /** - * Share meta-information such as host:port information where the Task Communicator may be - * listening. - * Primarily for use by compatible launchers to learn this information. - * - * @return meta info for the task communicator - * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. - * This will cause the app to shutdown. - */ - public abstract Object getMetaInfo() throws ServicePluginException; -} http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java deleted file mode 100644 index 7c5a648..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.api; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.Set; - -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.tez.dag.api.event.VertexState; -import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; -import org.apache.tez.dag.records.TezTaskAttemptID; - - -// Do not make calls into this from within a held lock. - -// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module -public interface TaskCommunicatorContext { - - // TODO TEZ-2003 (post) TEZ-2666 Enhancements to API - // - Consolidate usage of IDs - // - Split the heartbeat API to a liveness check and a status update - // - Rename and consolidate TaskHeartbeatResponse and TaskHeartbeatRequest - // - Fix taskStarted needs to be invoked before launching the actual task. - // - Potentially add methods to report availability stats to the scheduler - // - Report taskSuccess via a method instead of the heartbeat - // - Add methods to signal container / task state changes - // - Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc. - // - Handling of containres / tasks which no longer exist in the system (formalized interface instead of a shouldDie notification) - - /** - * Get the UserPayload that was configured while setting up the task communicator - * - * @return the initially configured user payload - */ - UserPayload getInitialUserPayload(); - - /** - * Get the application attempt id for the running application. Relevant when running under YARN - * - * @return the applicationAttemptId for the running app - */ - ApplicationAttemptId getApplicationAttemptId(); - - /** - * Get credentials associated with the AppMaster - * - * @return credentials - */ - Credentials getCredentials(); - - /** - * Check whether a running attempt can commit. This provides a leader election mechanism amongst - * multiple running attempts - * - * @param taskAttemptId the associated task attempt id - * @return whether the attempt can commit or not - * @throws IOException - */ - boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException; - - /** - * Mechanism for a {@link TaskCommunicator} to provide updates on a running task, as well as - * receive new information which may need to be propagated to the task. This includes events - * generated by the task and events which need to be sent to the task - * This method must be invoked periodically to receive updates for a running task - * - * @param request the update from the running task. - * @return the response that is requried by the task. - * @throws IOException - * @throws TezException - */ - TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException; - - /** - * Check whether the container is known by the framework. The state of this container is - * irrelevant - * - * @param containerId the relevant container id - * @return true if the container is known, false if it isn't - */ - boolean isKnownContainer(ContainerId containerId); - - /** - * Inform the framework that a task is alive. This needs to be invoked periodically to avoid the - * task attempt timing out. - * Invocations to heartbeat provides the same keep-alive functionality - * - * @param taskAttemptId the relevant task attempt - */ - void taskAlive(TezTaskAttemptID taskAttemptId); - - /** - * Inform the framework that a container is alive. This need to be invoked periodically to avoid - * the container attempt timing out. - * Invocations to heartbeat provides the same keep-alive functionality - * - * @param containerId the relevant container id - */ - void containerAlive(ContainerId containerId); - - /** - * Inform the framework that the task has started execution - * - * @param taskAttemptId the relevant task attempt id - * @param containerId the containerId in which the task attempt is running - */ - void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId); - - /** - * Inform the framework that a task has been killed - * - * @param taskAttemptId the relevant task attempt id - * @param taskAttemptEndReason the reason for the task attempt being killed - * @param diagnostics any diagnostics messages which are relevant to the task attempt - * kill - */ - void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, - @Nullable String diagnostics); - - /** - * Inform the framework that a task has failed - * - * @param taskAttemptId the relevant task attempt id - * @param taskAttemptEndReason the reason for the task failure - * @param diagnostics any diagnostics messages which are relevant to the task attempt - * failure - */ - void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, - @Nullable String diagnostics); - - /** - * Register to get notifications on updates to the specified vertex. Notifications will be sent - * via {@link org.apache.tez.runtime.api.InputInitializer#onVertexStateUpdated(org.apache.tez.dag.api.event.VertexStateUpdate)} - * </p> - * <p/> - * This method can only be invoked once. Duplicate invocations will result in an error. - * - * @param vertexName the vertex name for which notifications are required. - * @param stateSet the set of states for which notifications are required. null implies all - */ - void registerForVertexStateUpdates(String vertexName, @Nullable Set<VertexState> stateSet); - - /** - * Get the name of the currently executing dag - * - * @return the name of the currently executing dag - */ - String getCurrentDagName(); - - /** - * Get an identifier for the executing context of the DAG. - * @return a String identifier for the exeucting context. - */ - String getCurrentAppIdentifier(); - - /** - * Get the identifier for the currently executing dag. - * @return a numerical identifier for the currently running DAG. This is unique within the currently running application. - */ - int getCurrentDagIdenitifer(); - - /** - * Get the name of the Input vertices for the specified vertex. - * Root Inputs are not returned. - * - * @param vertexName the vertex for which source vertex names will be returned - * @return an Iterable containing the list of input vertices for the specified vertex - */ - Iterable<String> getInputVertexNames(String vertexName); - - /** - * Get the total number of tasks in the given vertex - * - * @param vertexName the relevant vertex name - * @return total number of tasks in this vertex - */ - int getVertexTotalTaskCount(String vertexName); - - /** - * Get the number of completed tasks for a given vertex - * - * @param vertexName the vertex name - * @return the number of completed tasks for the vertex - */ - int getVertexCompletedTaskCount(String vertexName); - - /** - * Get the number of running tasks for a given vertex - * - * @param vertexName the vertex name - * @return the number of running tasks for the vertex - */ - int getVertexRunningTaskCount(String vertexName); - - /** - * Get the start time for the first attempt of the specified task - * - * @param vertexName the vertex to which the task belongs - * @param taskIndex the index of the task - * @return the start time for the first attempt of the task - */ - long getFirstAttemptStartTime(String vertexName, int taskIndex); - - /** - * Get the start time for the currently executing DAG - * - * @return time when the current dag started executing - */ - long getDagStartTime(); -} http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java deleted file mode 100644 index d0c22d3..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.api; - -import java.util.List; - -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.runtime.api.impl.TezEvent; - -// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module -public class TaskHeartbeatRequest { - - // TODO TEZ-2003 (post) TEZ-2666 Ideally containerIdentifier should not be part of the request. - private final String containerIdentifier; - private final TezTaskAttemptID taskAttemptId; - private final List<TezEvent> events; - private final int startIndex; - private final int preRoutedStartIndex; - private final int maxEvents; - - - public TaskHeartbeatRequest(String containerIdentifier, TezTaskAttemptID taskAttemptId, List<TezEvent> events, int startIndex, - int preRoutedStartIndex, - int maxEvents) { - this.containerIdentifier = containerIdentifier; - this.taskAttemptId = taskAttemptId; - this.events = events; - this.startIndex = startIndex; - this.preRoutedStartIndex = preRoutedStartIndex; - this.maxEvents = maxEvents; - } - - public String getContainerIdentifier() { - return containerIdentifier; - } - - public TezTaskAttemptID getTaskAttemptId() { - return taskAttemptId; - } - - public List<TezEvent> getEvents() { - return events; - } - - public int getStartIndex() { - return startIndex; - } - - public int getPreRoutedStartIndex() { - return preRoutedStartIndex; - } - - public int getMaxEvents() { - return maxEvents; - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java deleted file mode 100644 index dcf89ff..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.api; - -import java.util.List; - -import org.apache.tez.runtime.api.impl.TezEvent; - -// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module -public class TaskHeartbeatResponse { - - private final boolean shouldDie; - private final int nextFromEventId; - private final int nextPreRoutedEventId; - private final List<TezEvent> events; - - public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events, int nextFromEventId, int nextPreRoutedEventId) { - this.shouldDie = shouldDie; - this.events = events; - this.nextFromEventId = nextFromEventId; - this.nextPreRoutedEventId = nextPreRoutedEventId; - } - - public boolean isShouldDie() { - return shouldDie; - } - - public List<TezEvent> getEvents() { - return events; - } - - public int getNextFromEventId() { - return nextFromEventId; - } - - public int getNextPreRoutedEventId() { - return nextPreRoutedEventId; - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java index 2b7234c..7f88be2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java @@ -29,11 +29,10 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.rm.container.AMContainer; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; -import org.apache.tez.dag.api.TaskCommunicatorContext; -import org.apache.tez.dag.api.TaskHeartbeatRequest; -import org.apache.tez.dag.api.TaskHeartbeatResponse; +import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; +import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest; +import org.apache.tez.serviceplugins.api.TaskHeartbeatResponse; import org.apache.tez.dag.api.TezException; -import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.app.dag.DAG; http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java index 64a964b..a196114 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java @@ -33,7 +33,7 @@ import org.apache.commons.collections4.ListUtils; import org.apache.hadoop.yarn.event.Event; import org.apache.tez.Utils; import org.apache.tez.dag.api.NamedEntityDescriptor; -import org.apache.tez.dag.api.TaskCommunicator; +import org.apache.tez.serviceplugins.api.TaskCommunicator; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; @@ -53,14 +53,14 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezUtilsInternal; -import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; -import org.apache.tez.dag.api.TaskHeartbeatResponse; +import org.apache.tez.serviceplugins.api.TaskHeartbeatResponse; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.tez.dag.api.TaskHeartbeatRequest; +import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java index 4f9780e..4a75875 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java @@ -21,7 +21,7 @@ import java.util.Map; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.tez.dag.api.TaskCommunicator; +import org.apache.tez.serviceplugins.api.TaskCommunicator; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.TaskSpec; http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java index 47688d1..15d90d3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java @@ -18,7 +18,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; -import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; import org.apache.tez.dag.api.TezUncheckedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index d071e0d..0bbe97a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -47,10 +47,10 @@ import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.common.security.TokenCache; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; -import org.apache.tez.dag.api.TaskCommunicator; -import org.apache.tez.dag.api.TaskCommunicatorContext; -import org.apache.tez.dag.api.TaskHeartbeatRequest; -import org.apache.tez.dag.api.TaskHeartbeatResponse; +import org.apache.tez.serviceplugins.api.TaskCommunicator; +import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; +import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest; +import org.apache.tez.serviceplugins.api.TaskHeartbeatResponse; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java new file mode 100644 index 0000000..8f919d1 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java @@ -0,0 +1,232 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +import javax.annotation.Nullable; +import java.net.InetSocketAddress; +import java.util.Map; + +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.common.ServicePluginLifecycle; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.TaskSpec; + +// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module +// TODO TEZ-2003 (post) TEZ-2664. Ideally, don't expose YARN containerId; instead expose a Tez specific construct. + +/** + * This class represents the API for a custom TaskCommunicator which can be run within the Tez AM. + * This is used to communicate with running services, potentially launching tasks, and getting + * updates from running tasks. + * <p/> + * The plugin is initialized with an instance of {@link TaskCommunicatorContext} - which provides + * a mechanism to notify the system about allocation decisions and resources to the Tez framework. + * + * If setting up a heartbeat between the task and the AM, the framework is responsible for error checking + * of this heartbeat mechanism, handling lost or duplicate responses. + * + */ +public abstract class TaskCommunicator implements ServicePluginLifecycle { + + // TODO TEZ-2003 (post) TEZ-2666 Enhancements to interface + // - registerContainerEnd should provide the end reason / possible rename + // - get rid of getAddress + // - Add methods to support task preemption + // - Add a dagStarted notification, along with a payload + // - taskSpec breakup into a clean interface + // - Add methods to report task / container completion + + private final TaskCommunicatorContext taskCommunicatorContext; + + public TaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) { + this.taskCommunicatorContext = taskCommunicatorContext; + } + + /** + * Get the {@link TaskCommunicatorContext} associated with this instance of the scheduler, which + * is + * used to communicate with the rest of the system + * + * @return an instance of {@link TaskCommunicatorContext} + */ + public TaskCommunicatorContext getContext() { + return taskCommunicatorContext; + } + + /** + * An entry point for initialization. + * Order of service setup. Constructor, initialize(), start() - when starting a service. + * + * @throws Exception + */ + @Override + public void initialize() throws Exception { + } + + /** + * An entry point for starting the service. + * Order of service setup. Constructor, initialize(), start() - when starting a service. + * + * @throws Exception + */ + @Override + public void start() throws Exception { + } + + /** + * Stop the service. This could be invoked at any point, when the service is no longer required - + * including in case of errors. + * + * @throws Exception + */ + @Override + public void shutdown() throws Exception { + } + + + /** + * Register a new container. + * + * @param containerId the associated containerId + * @param hostname the hostname on which the container runs + * @param port the port for the service which is running the container + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. + */ + public abstract void registerRunningContainer(ContainerId containerId, String hostname, + int port) throws ServicePluginException; + + /** + * Register the end of a container. This can be caused by preemption, the container completing + * successfully, etc. + * + * @param containerId the associated containerId + * @param endReason the end reason for the container completing + * @param diagnostics diagnostics associated with the container end + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. + */ + public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, + @Nullable String diagnostics) throws + ServicePluginException; + + /** + * Register a task attempt to execute on a container + * + * @param containerId the containerId on which this task needs to run + * @param taskSpec the task specifications for the task to be executed + * @param additionalResources additional local resources which may be required to run this task + * on + * the container + * @param credentials the credentials required to run this task + * @param credentialsChanged whether the credentials are different from the original credentials + * associated with this container + * @param priority the priority of the task being executed + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. + */ + public abstract void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec, + Map<String, LocalResource> additionalResources, + Credentials credentials, + boolean credentialsChanged, int priority) throws + ServicePluginException; + + /** + * Register the completion of a task. This may be a result of preemption, the container dying, + * the node dying, the task completing to success + * + * @param taskAttemptID the task attempt which has completed / needs to be completed + * @param endReason the endReason for the task attempt. + * @param diagnostics diagnostics associated with the task end + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. + */ + public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, + TaskAttemptEndReason endReason, + @Nullable String diagnostics) throws + ServicePluginException; + + /** + * Return the address, if any, that the service listens on + * + * @return the address + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. + */ + public abstract InetSocketAddress getAddress() throws ServicePluginException; + + /** + * Receive notifications on vertex state changes. + * <p/> + * State changes will be received based on the registration via {@link + * org.apache.tez.runtime.api.InputInitializerContext#registerForVertexStateUpdates(String, + * java.util.Set)}. Notifications will be received for all registered state changes, and not just + * for the latest state update. They will be in order in which the state change occurred. </p> + * <p/> + * Extensive processing should not be performed via this method call. Instead this should just be + * used as a notification mechanism. + * <br>This method may be invoked concurrently with other invocations into the TaskCommunicator + * and + * multi-threading/concurrency implications must be considered. + * + * @param stateUpdate an event indicating the name of the vertex, and it's updated state. + * Additional information may be available for specific events, Look at the + * type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate} + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. + */ + public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws ServicePluginException; + + /** + * Indicates the current running dag is complete. The TaskCommunicatorContext can be used to + * query information about the current dag during the duration of the dagComplete invocation. + * <p/> + * After this, the contents returned from querying the context may change at any point - due to + * the next dag being submitted. + * + * @param dagIdentifier the unique numerical identifier for the DAG in the specified execution context. + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. + */ + public abstract void dagComplete(int dagIdentifier) throws ServicePluginException; + + /** + * Share meta-information such as host:port information where the Task Communicator may be + * listening. + * Primarily for use by compatible launchers to learn this information. + * + * @return meta info for the task communicator + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. + */ + public abstract Object getMetaInfo() throws ServicePluginException; +} http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java new file mode 100644 index 0000000..c55bdbd --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java @@ -0,0 +1,240 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Set; + +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.records.TezTaskAttemptID; + + +// Do not make calls into this from within a held lock. + +// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module +public interface TaskCommunicatorContext { + + // TODO TEZ-2003 (post) TEZ-2666 Enhancements to API + // - Consolidate usage of IDs + // - Split the heartbeat API to a liveness check and a status update + // - Rename and consolidate TaskHeartbeatResponse and TaskHeartbeatRequest + // - Fix taskStarted needs to be invoked before launching the actual task. + // - Potentially add methods to report availability stats to the scheduler + // - Report taskSuccess via a method instead of the heartbeat + // - Add methods to signal container / task state changes + // - Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc. + // - Handling of containres / tasks which no longer exist in the system (formalized interface instead of a shouldDie notification) + + /** + * Get the UserPayload that was configured while setting up the task communicator + * + * @return the initially configured user payload + */ + UserPayload getInitialUserPayload(); + + /** + * Get the application attempt id for the running application. Relevant when running under YARN + * + * @return the applicationAttemptId for the running app + */ + ApplicationAttemptId getApplicationAttemptId(); + + /** + * Get credentials associated with the AppMaster + * + * @return credentials + */ + Credentials getCredentials(); + + /** + * Check whether a running attempt can commit. This provides a leader election mechanism amongst + * multiple running attempts + * + * @param taskAttemptId the associated task attempt id + * @return whether the attempt can commit or not + * @throws IOException + */ + boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException; + + /** + * Mechanism for a {@link TaskCommunicator} to provide updates on a running task, as well as + * receive new information which may need to be propagated to the task. This includes events + * generated by the task and events which need to be sent to the task + * This method must be invoked periodically to receive updates for a running task + * + * @param request the update from the running task. + * @return the response that is requried by the task. + * @throws IOException + * @throws TezException + */ + TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException; + + /** + * Check whether the container is known by the framework. The state of this container is + * irrelevant + * + * @param containerId the relevant container id + * @return true if the container is known, false if it isn't + */ + boolean isKnownContainer(ContainerId containerId); + + /** + * Inform the framework that a task is alive. This needs to be invoked periodically to avoid the + * task attempt timing out. + * Invocations to heartbeat provides the same keep-alive functionality + * + * @param taskAttemptId the relevant task attempt + */ + void taskAlive(TezTaskAttemptID taskAttemptId); + + /** + * Inform the framework that a container is alive. This need to be invoked periodically to avoid + * the container attempt timing out. + * Invocations to heartbeat provides the same keep-alive functionality + * + * @param containerId the relevant container id + */ + void containerAlive(ContainerId containerId); + + /** + * Inform the framework that the task has started execution + * + * @param taskAttemptId the relevant task attempt id + * @param containerId the containerId in which the task attempt is running + */ + void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId); + + /** + * Inform the framework that a task has been killed + * + * @param taskAttemptId the relevant task attempt id + * @param taskAttemptEndReason the reason for the task attempt being killed + * @param diagnostics any diagnostics messages which are relevant to the task attempt + * kill + */ + void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, + @Nullable String diagnostics); + + /** + * Inform the framework that a task has failed + * + * @param taskAttemptId the relevant task attempt id + * @param taskAttemptEndReason the reason for the task failure + * @param diagnostics any diagnostics messages which are relevant to the task attempt + * failure + */ + void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, + @Nullable String diagnostics); + + /** + * Register to get notifications on updates to the specified vertex. Notifications will be sent + * via {@link org.apache.tez.runtime.api.InputInitializer#onVertexStateUpdated(org.apache.tez.dag.api.event.VertexStateUpdate)} + * </p> + * <p/> + * This method can only be invoked once. Duplicate invocations will result in an error. + * + * @param vertexName the vertex name for which notifications are required. + * @param stateSet the set of states for which notifications are required. null implies all + */ + void registerForVertexStateUpdates(String vertexName, @Nullable Set<VertexState> stateSet); + + /** + * Get the name of the currently executing dag + * + * @return the name of the currently executing dag + */ + String getCurrentDagName(); + + /** + * Get an identifier for the executing context of the DAG. + * @return a String identifier for the exeucting context. + */ + String getCurrentAppIdentifier(); + + /** + * Get the identifier for the currently executing dag. + * @return a numerical identifier for the currently running DAG. This is unique within the currently running application. + */ + int getCurrentDagIdenitifer(); + + /** + * Get the name of the Input vertices for the specified vertex. + * Root Inputs are not returned. + * + * @param vertexName the vertex for which source vertex names will be returned + * @return an Iterable containing the list of input vertices for the specified vertex + */ + Iterable<String> getInputVertexNames(String vertexName); + + /** + * Get the total number of tasks in the given vertex + * + * @param vertexName the relevant vertex name + * @return total number of tasks in this vertex + */ + int getVertexTotalTaskCount(String vertexName); + + /** + * Get the number of completed tasks for a given vertex + * + * @param vertexName the vertex name + * @return the number of completed tasks for the vertex + */ + int getVertexCompletedTaskCount(String vertexName); + + /** + * Get the number of running tasks for a given vertex + * + * @param vertexName the vertex name + * @return the number of running tasks for the vertex + */ + int getVertexRunningTaskCount(String vertexName); + + /** + * Get the start time for the first attempt of the specified task + * + * @param vertexName the vertex to which the task belongs + * @param taskIndex the index of the task + * @return the start time for the first attempt of the task + */ + long getFirstAttemptStartTime(String vertexName, int taskIndex); + + /** + * Get the start time for the currently executing DAG + * + * @return time when the current dag started executing + */ + long getDagStartTime(); +} http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatRequest.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatRequest.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatRequest.java new file mode 100644 index 0000000..40b006f --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatRequest.java @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +import java.util.List; + +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.TezEvent; + +// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module +public class TaskHeartbeatRequest { + + // TODO TEZ-2003 (post) TEZ-2666 Ideally containerIdentifier should not be part of the request. + private final String containerIdentifier; + private final TezTaskAttemptID taskAttemptId; + private final List<TezEvent> events; + private final int startIndex; + private final int preRoutedStartIndex; + private final int maxEvents; + + + public TaskHeartbeatRequest(String containerIdentifier, TezTaskAttemptID taskAttemptId, List<TezEvent> events, int startIndex, + int preRoutedStartIndex, + int maxEvents) { + this.containerIdentifier = containerIdentifier; + this.taskAttemptId = taskAttemptId; + this.events = events; + this.startIndex = startIndex; + this.preRoutedStartIndex = preRoutedStartIndex; + this.maxEvents = maxEvents; + } + + public String getContainerIdentifier() { + return containerIdentifier; + } + + public TezTaskAttemptID getTaskAttemptId() { + return taskAttemptId; + } + + public List<TezEvent> getEvents() { + return events; + } + + public int getStartIndex() { + return startIndex; + } + + public int getPreRoutedStartIndex() { + return preRoutedStartIndex; + } + + public int getMaxEvents() { + return maxEvents; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatResponse.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatResponse.java new file mode 100644 index 0000000..9145004 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatResponse.java @@ -0,0 +1,65 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +import java.util.List; + +import org.apache.tez.runtime.api.impl.TezEvent; + +// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module +public class TaskHeartbeatResponse { + + private final boolean shouldDie; + private final int nextFromEventId; + private final int nextPreRoutedEventId; + private final List<TezEvent> events; + + public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events, int nextFromEventId, int nextPreRoutedEventId) { + this.shouldDie = shouldDie; + this.events = events; + this.nextFromEventId = nextFromEventId; + this.nextPreRoutedEventId = nextPreRoutedEventId; + } + + public boolean isShouldDie() { + return shouldDie; + } + + public List<TezEvent> getEvents() { + return events; + } + + public int getNextFromEventId() { + return nextFromEventId; + } + + public int getNextPreRoutedEventId() { + return nextPreRoutedEventId; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java index 5222a2d..869bfd5 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java @@ -25,7 +25,7 @@ import static org.mockito.Mockito.verify; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tez.common.ContainerSignatureMatcher; -import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; import org.apache.tez.dag.app.rm.container.AMContainerMap; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java index d76a5b3..5323928 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java @@ -49,8 +49,8 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; -import org.apache.tez.dag.api.TaskCommunicator; -import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.serviceplugins.api.TaskCommunicator; +import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.UserPayload; http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java index 2921a22..0f8afaa 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java @@ -54,19 +54,19 @@ import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.NamedEntityDescriptor; -import org.apache.tez.dag.api.TaskCommunicator; +import org.apache.tez.serviceplugins.api.TaskCommunicator; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; -import org.apache.tez.dag.api.TaskHeartbeatRequest; -import org.apache.tez.dag.api.TaskHeartbeatResponse; +import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest; +import org.apache.tez.serviceplugins.api.TaskHeartbeatResponse; import org.apache.tez.dag.api.TezException; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.TezTaskUmbilicalProtocol; -import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java index 212bca4..e89cc99 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java @@ -29,7 +29,7 @@ package org.apache.tez.dag.app; import com.google.common.collect.Sets; -import org.apache.tez.dag.api.TaskCommunicator; +import org.apache.tez.serviceplugins.api.TaskCommunicator; import org.junit.Test; public class TestTaskCommunicatorWrapper { http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 4772492..3bb688e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -60,7 +60,7 @@ import org.apache.hadoop.yarn.util.SystemClock; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.tez.common.MockDNSToSwitchMapping; -import org.apache.tez.dag.api.TaskCommunicator; +import org.apache.tez.serviceplugins.api.TaskCommunicator; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java index c649870..c62ff21 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java @@ -62,7 +62,6 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ContainerSignatureMatcher; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; -import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; @@ -71,10 +70,7 @@ import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.client.DAGClientServer; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; -import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; -import org.apache.tez.dag.app.TaskCommunicatorManager; -import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index 8b8b6d7..ed14871 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -67,7 +67,7 @@ import org.apache.tez.dag.app.TaskCommunicatorWrapper; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.serviceplugins.api.ServicePluginException; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; -import org.apache.tez.dag.api.TaskCommunicator; +import org.apache.tez.serviceplugins.api.TaskCommunicator; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.ContainerContext; http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java index e21dda1..2fcd0c8 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java @@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; -import org.apache.tez.dag.api.TaskCommunicator; +import org.apache.tez.serviceplugins.api.TaskCommunicator; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java index 127967a..f199dcf 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java @@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; -import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; import org.apache.tez.dag.app.TezTaskCommunicatorImpl; import org.apache.tez.dag.app.TezTestServiceCommunicator; import org.apache.tez.dag.records.TezTaskAttemptID; http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java index 0a3d8d4..90313d4 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java @@ -22,8 +22,8 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.tez.dag.api.TaskCommunicator; -import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.serviceplugins.api.TaskCommunicator; +import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.TaskSpec;
