Repository: tez Updated Branches: refs/heads/TEZ-2003 a169d8ae9 -> 0026ebecd
TEZ-2626. Fix log lines with DEBUG in messages, consolidate TEZ-2003 TODOs. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0026ebec Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0026ebec Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0026ebec Branch: refs/heads/TEZ-2003 Commit: 0026ebecd42542d2e0a5fe28617acae4bd02d1e1 Parents: a169d8a Author: Siddharth Seth <[email protected]> Authored: Thu Jul 30 13:39:40 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Thu Jul 30 13:39:40 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + pom.xml | 2 +- .../serviceplugins/api/ContainerLauncher.java | 4 --- .../tez/serviceplugins/api/TaskScheduler.java | 5 ++++ .../api/TaskSchedulerContext.java | 6 ++-- .../apache/tez/dag/api/TaskCommunicator.java | 29 +++++++------------- .../tez/dag/api/TaskCommunicatorContext.java | 24 ++++++++-------- .../tez/dag/api/TaskHeartbeatRequest.java | 6 ++-- .../tez/dag/api/TaskHeartbeatResponse.java | 2 +- .../dag/app/TaskAttemptListenerImpTezDag.java | 7 ++--- .../dag/app/TaskCommunicatorContextImpl.java | 2 +- .../tez/dag/app/TezTaskCommunicatorImpl.java | 3 +- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 2 +- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 2 +- .../dag/app/rm/TaskSchedulerEventHandler.java | 6 ---- .../rm/container/AMContainerEventAssignTA.java | 2 -- tez-ext-service-tests/pom.xml | 1 - .../TezTestServiceContainerLauncher.java | 4 +-- .../TezTestServiceNoOpContainerLauncher.java | 2 +- .../rm/TezTestServiceTaskSchedulerService.java | 6 ++-- .../tez/service/impl/ContainerRunnerImpl.java | 6 ++-- .../tez/tests/TestExternalTezServices.java | 2 -- .../internals/api/TaskReporterInterface.java | 4 +-- 23 files changed, 51 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 9b3967a..c7a3dcc 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -41,5 +41,6 @@ ALL CHANGES: TEZ-2653. Change service contexts to expose a user specified payload instead of the AM configuration. TEZ-2441. Add tests for TezTaskRunner2. TEZ-2657. Add tests for client side changes - specifying plugins, etc. + TEZ-2626. Fix log lines with DEBUG in messages, consolidate TEZ-2003 TODOs. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1cffe6d..9d669f2 100644 --- a/pom.xml +++ b/pom.xml @@ -667,10 +667,10 @@ <module>tez-examples</module> <module>tez-tests</module> <module>tez-dag</module> + <module>tez-ext-service-tests</module> <module>tez-ui</module> <module>tez-plugins</module> <module>tez-tools</module> - <module>tez-ext-service-tests</module> <module>tez-dist</module> <module>docs</module> </modules> http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java index 8337dcb..7f58f77 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java @@ -16,7 +16,6 @@ package org.apache.tez.serviceplugins.api; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.service.AbstractService; import org.apache.tez.common.ServicePluginLifecycle; /** @@ -30,9 +29,6 @@ public abstract class ContainerLauncher implements ServicePluginLifecycle { private final ContainerLauncherContext containerLauncherContext; - // TODO TEZ-2003 Simplify this by moving away from AbstractService. Potentially Guava AbstractService. - // A serviceInit(Configuration) is not likely to be very useful, and will expose unnecessary internal - // configuration to the services if populated with the AM Configuration public ContainerLauncher(ContainerLauncherContext containerLauncherContext) { this.containerLauncherContext = containerLauncherContext; } http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java index a5b054f..9ff2bd5 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java @@ -27,6 +27,11 @@ import org.apache.tez.common.ServicePluginLifecycle; @InterfaceStability.Unstable public abstract class TaskScheduler implements ServicePluginLifecycle { + // TODO TEZ-2003 (post) TEZ-2668 + // - Should setRegister / unregister be part of APIs when not YARN specific ? + // - Include vertex / task information in therequest so that the scheduler can make decisions + // around prioritizing tasks in the same vertex when others exist at the same priority. + private final TaskSchedulerContext taskSchedulerContext; public TaskScheduler(TaskSchedulerContext taskSchedulerContext) { http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java index 6f37641..dbbf75c 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java @@ -53,7 +53,10 @@ public interface TaskSchedulerContext { IDLE, RUNNING_APP, COMPLETED } - // TODO Post TEZ-2003. Remove references to YARN constructs like Container, ContainerStatus, NodeReport + // TODO TEZ-2003 (post) TEZ-2664. Remove references to YARN constructs like Container, ContainerStatus, NodeReport + // TODO TEZ-2003 (post) TEZ-2668 Enhancements to TaskScheduler interfaces + // - setApplicationRegistrationData may not be relevant to non YARN clusters + // - getAppFinalStatus may not be relevant to non YARN clusters // upcall to app must be outside locks public void taskAllocated(Object task, Object appCookie, @@ -78,7 +81,6 @@ public interface TaskSchedulerContext { public float getProgress(); public void preemptContainer(ContainerId containerId); - // TODO Post TEZ-2003. Another method which is primarily relevant to YARN clusters for unregistration. public AppFinalStatus getFinalAppStatus(); http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java index f221414..794d390 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java @@ -27,9 +27,18 @@ import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.TaskSpec; -// TODO TEZ-2003 Move this into the tez-api module +// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module +// TODO TEZ-2003 (post) TEZ-2664. Ideally, don't expose YARN containerId; instead expose a Tez specific construct. public abstract class TaskCommunicator implements ServicePluginLifecycle { + // TODO TEZ-2003 (post) TEZ-2666 Enhancements to interface + // - registerContainerEnd should provide the end reason / possible rename + // - get rid of getAddress + // - Add methods to support task preemption + // - Add a dagStarted notification, along with a payload + // - taskSpec breakup into a clean interface + // - Add methods to report task / container completion + private final TaskCommunicatorContext taskCommunicatorContext; public TaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) { @@ -52,36 +61,20 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle { public void shutdown() throws Exception { } - // TODO Post TEZ-2003 Move this into the API module. Moving this requires abstractions for - // TaskSpec and related classes. (assuming that's efficient for execution) - // TODO TEZ-2003 Ideally, don't expose YARN containerId; instead expose a Tez specific construct. - // TODO When talking to an external service, this plugin implementer may need access to a host:port public abstract void registerRunningContainer(ContainerId containerId, String hostname, int port); - // TODO TEZ-2003 Ideally, don't expose YARN containerId; instead expose a Tez specific construct. public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason); - // TODO TEZ-2003 Provide additional inforamtion like reason for container end / Task end. - // Was it caused by preemption - or as a result of a general task completion / container completion - - // TODO TEZ-2003 TaskSpec breakup into a clean interface - // TODO TEZ-2003 Add support for priority public abstract void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec, Map<String, LocalResource> additionalResources, Credentials credentials, boolean credentialsChanged, int priority); - // TODO TEZ-2003. Are additional APIs required to mark a container as completed ? - for completeness. - - // TODO TEZ-2003 Remove reference to TaskAttemptID public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason); - // TODO TEZ-2003 This doesn't necessarily belong here. A server may not start within the AM. public abstract InetSocketAddress getAddress(); - // TODO Eventually. Add methods here to support preemption of tasks. - /** * Receive notifications on vertex state changes. * <p/> @@ -108,8 +101,6 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle { * After this, the contents returned from querying the context may change at any point - due to * the next dag being submitted. */ - // TODO TEZ-2003 This is extremely difficult to use. Add the dagStarted notification, and potentially - // throw exceptions between a dagComplete and dagStart invocation. public abstract void dagComplete(String dagName); /** http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java index a1e94a3..8073f6a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java @@ -28,23 +28,27 @@ import org.apache.tez.dag.records.TezTaskAttemptID; // Do not make calls into this from within a held lock. -// TODO TEZ-2003 Move this into the tez-api module +// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module public interface TaskCommunicatorContext { - // TODO TEZ-2003 Add signalling back into this to indicate errors - e.g. Container unregsitered, task no longer running, etc. - - // TODO TEZ-2003 Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc. + // TODO TEZ-2003 (post) TEZ-2666 Enhancements to API + // - Consolidate usage of IDs + // - Split the heartbeat API to a liveness check and a status update + // - Rename and consolidate TaskHeartbeatResponse and TaskHeartbeatRequest + // - Fix taskStarted needs to be invoked before launching the actual task. + // - Potentially add methods to report availability stats to the scheduler + // - Report taskSuccess via a method instead of the heartbeat + // - Add methods to signal container / task state changes + // - Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc. + // - Handling of containres / tasks which no longer exist in the system (formalized interface instead of a shouldDie notification) UserPayload getInitialUserPayload(); ApplicationAttemptId getApplicationAttemptId(); Credentials getCredentials(); - // TODO TEZ-2003 Move to vertex, taskIndex, version boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException; - // TODO TEZ-2003 Split the heartbeat API to a liveness check and a status update - // KKK Rename this API TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException; boolean isKnownContainer(ContainerId containerId); @@ -53,13 +57,10 @@ public interface TaskCommunicatorContext { void containerAlive(ContainerId containerId); - // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt* void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId); - // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt* void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics); - // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt* void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics); /** @@ -72,9 +73,6 @@ public interface TaskCommunicatorContext { * @param stateSet the set of states for which notifications are required. null implies all */ void registerForVertexStateUpdates(String vertexName, @Nullable Set<VertexState> stateSet); - // TODO TEZ-2003 API. Should a method exist for task succeeded. - - // TODO Eventually Add methods to report availability stats to the scheduler. /** * Get the name of the currently executing dag http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java index b5ff991..d0c22d3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java @@ -19,13 +19,11 @@ import java.util.List; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.TezEvent; -// TODO TEZ-2003 Move this into the tez-api module +// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module public class TaskHeartbeatRequest { - // TODO TEZ-2003 Ideally containerIdentifier should not be part of the request. - // Replace with a task lookup - vertex name + task index + // TODO TEZ-2003 (post) TEZ-2666 Ideally containerIdentifier should not be part of the request. private final String containerIdentifier; - // TODO TEZ-2003 Get rid of the task attemptId reference if possible private final TezTaskAttemptID taskAttemptId; private final List<TezEvent> events; private final int startIndex; http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java index 7f063c4..dcf89ff 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java @@ -18,7 +18,7 @@ import java.util.List; import org.apache.tez.runtime.api.impl.TezEvent; -// TODO TEZ-2003 Move this into the tez-api module +// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module public class TaskHeartbeatResponse { private final boolean shouldDie; http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index d9c3f96..462befe 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -163,7 +163,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements LOG.info("Using Default Local Task Communicator"); return new TezLocalTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]); } else { - // TODO TEZ-2003. Use the payload LOG.info("Using TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(), taskCommDescriptor.getClassName()); Class<? extends TaskCommunicator> taskCommClazz = (Class<? extends TaskCommunicator>) ReflectionUtils .getClazz(taskCommDescriptor.getClassName()); @@ -218,7 +217,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements // This can happen when a task heartbeats. Meanwhile the container is unregistered. // The information will eventually make it through to the plugin via a corresponding unregister. // There's a race in that case between the unregister making it through, and this method returning. - // TODO TEZ-2003. An exception back is likely a better approach than sending a shouldDie = true, + // TODO TEZ-2003 (post) TEZ-2666. An exception back is likely a better approach than sending a shouldDie = true, // so that the plugin can handle the scenario. Alternately augment the response with error codes. // Error codes would be better than exceptions. LOG.info("Attempt: " + taskAttemptID + " is not recognized for heartbeats"); @@ -279,7 +278,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements String diagnostics) { // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler, // and messages from the scheduler will release the container. - // TODO TEZ-2003 Maybe consider un-registering here itself, since the task is not active anymore, + // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore, // instead of waiting for the unregister to flow through the Container. // Fix along the same lines as TEZ-2124 by introducing an explict context. context.getEventHandler().handle(new TaskAttemptEventAttemptKilled(taskAttemptId, @@ -291,7 +290,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements String diagnostics) { // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler, // and messages from the scheduler will release the container. - // TODO TEZ-2003 Maybe consider un-registering here itself, since the task is not active anymore, + // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore, // instead of waiting for the unregister to flow through the Container. // Fix along the same lines as TEZ-2124 by introducing an explict context. context.getEventHandler().handle(new TaskAttemptEventAttemptFailed(taskAttemptId, http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java index cc315b7..0f10305 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java @@ -43,6 +43,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; @InterfaceAudience.Private public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, VertexStateUpdateListener { + // TODO TEZ-2003 (post) TEZ-2669 Propagate errors baack to the AM with proper error reporting private final AppContext context; private final TaskAttemptListenerImpTezDag taskAttemptListener; @@ -188,7 +189,6 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver try { taskAttemptListener.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex); } catch (Exception e) { - // TODO TEZ-2003 This needs to be propagated to the DAG as a user error. throw new TezUncheckedException(e); } } http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index 2a5c80e..fb6d5e7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -463,9 +463,8 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { // Holder for Task information, which eventually will likely be VertexImplm taskIndex, attemptIndex + // TODO TEZ-2003. TEZ-2670. Remove this class. protected static class TaskAttempt { - // TODO TEZ-2003 Change this to work with VertexName, int id, int version - // TODO TEZ-2003 Avoid constructing this unit all over the place private TezTaskAttemptID taskAttemptId; TaskAttempt(TezTaskAttemptID taskAttemptId) { http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 233a5d1..dcf6e20 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -1447,7 +1447,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, // check task resources, only check it in non-local mode if (!appContext.isLocal()) { for (Vertex v : vertexMap.values()) { - // TODO TEZ-2003 (post) Ideally, this should be per source. + // TODO TEZ-2003 (post) TEZ-2624 Ideally, this should be per source. if (v.getTaskResource().compareTo(appContext.getClusterInfo().getMaxContainerCapability()) > 0) { String msg = "Vertex's TaskResource is beyond the cluster container capability," + "Vertex=" + v.getLogIdentifier() +", Requested TaskResource=" + v.getTaskResource() http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index b006f30..dbf2818 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -182,7 +182,7 @@ public class TaskAttemptImpl implements TaskAttempt, private final StateMachine<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent> stateMachine; - // TODO TEZ-2003 We may need some additional state management for STATUS_UPDATES, FAILED, KILLED coming in before + // TODO TEZ-2003 (post) TEZ-2667 We may need some additional state management for STATUS_UPDATES, FAILED, KILLED coming in before // TASK_STARTED_REMOTELY. In case of a PUSH it's more intuitive to send TASK_STARTED_REMOTELY after communicating // with the listening service and getting a response, which in turn can trigger STATUS_UPDATES / FAILED / KILLED http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index 4c2e631..c86f638 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -502,7 +502,6 @@ public class TaskSchedulerEventHandler extends AbstractService implements taskSchedulerServiceWrappers[i].start(); if (shouldUnregisterFlag.get()) { // Flag may have been set earlier when task scheduler was not initialized - // TODO TEZ-2003 Should setRegister / unregister be part of APIs when not YARN specific ? // External services could need to talk to some other entity. taskSchedulers[i].setShouldUnregister(); } @@ -564,8 +563,6 @@ public class TaskSchedulerEventHandler extends AbstractService implements appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS); } - // TODO TEZ-2003 Consolidate TaskSchedulerAppCallback methods once these methods are moved into context - // TaskSchedulerAppCallback methods with schedulerId, where relevant public synchronized void taskAllocated(int schedulerId, Object task, Object appCookie, @@ -651,7 +648,6 @@ public class TaskSchedulerEventHandler extends AbstractService implements Resource maxContainerCapability, Map<ApplicationAccessType, String> appAcls, ByteBuffer clientAMSecretKey) { - // TODO TEZ-2003 (post) Ideally clusterInfo should be available per source rather than a global view. this.appContext.getClusterInfo().setMaxContainerCapability( maxContainerCapability); this.appAcls = appAcls; @@ -751,7 +747,6 @@ public class TaskSchedulerEventHandler extends AbstractService implements this.shouldUnregisterFlag.set(true); for (int i = 0 ; i < taskSchedulers.length ; i++) { if (this.taskSchedulers[i] != null) { - // TODO TEZ-2003 registration required for all schedulers ? this.taskSchedulers[i].setShouldUnregister(); } } @@ -764,7 +759,6 @@ public class TaskSchedulerEventHandler extends AbstractService implements public boolean hasUnregistered() { boolean result = true; for (int i = 0 ; i < taskSchedulers.length ; i++) { - // TODO TEZ-2003 registration required for all schedulers ? result |= this.taskSchedulers[i].hasUnregistered(); if (result == false) { return result; http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java index 0398882..682cd02 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java @@ -27,8 +27,6 @@ import org.apache.tez.runtime.api.impl.TaskSpec; public class AMContainerEventAssignTA extends AMContainerEvent { - // TODO TEZ-2003. Add the task priority to this event. - private final TezTaskAttemptID attemptId; // TODO Maybe have tht TAL pull the remoteTask from the TaskAttempt itself ? private final TaskSpec remoteTaskSpec; http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-ext-service-tests/pom.xml ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/pom.xml b/tez-ext-service-tests/pom.xml index 907e129..f95f4ca 100644 --- a/tez-ext-service-tests/pom.xml +++ b/tez-ext-service-tests/pom.xml @@ -23,7 +23,6 @@ <version>0.8.0-TEZ-2003-SNAPSHOT</version> </parent> - <!-- TODO TEZ-2003 Merge this into the tez-tests module --> <artifactId>tez-ext-service-tests</artifactId> <dependencies> http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java index f31a07b..845a27b 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java @@ -37,8 +37,6 @@ import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainer import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// TODO TEZ-2003 look for all LOG.*(DBG and LOG.*(DEBUG messages - public class TezTestServiceContainerLauncher extends ContainerLauncher { // TODO Support interruptability of tasks which haven't yet been launched. @@ -119,7 +117,7 @@ public class TezTestServiceContainerLauncher extends ContainerLauncher { @Override public void stopContainer(ContainerStopRequest stopRequest) { - LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + stopRequest); + LOG.info("Ignoring stopContainer for event: " + stopRequest); // that the container is actually done (normally received from RM) // TODO Sending this out for an un-launched container is invalid getContext().containerStopRequested(stopRequest.getContainerId()); http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java index 7b42296..d265736 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java @@ -39,7 +39,7 @@ public class TezTestServiceNoOpContainerLauncher extends ContainerLauncher { @Override public void stopContainer(ContainerStopRequest stopRequest) { - LOG.info("DEBUG: Ignoring STOP_REQUEST {}", stopRequest); + LOG.info("Ignoring stopRequest {}", stopRequest); getContext().containerStopRequested(stopRequest.getContainerId()); } } http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java index 0d87995..17f8a87 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java @@ -151,12 +151,12 @@ public class TezTestServiceTaskSchedulerService extends TaskScheduler { @Override public void blacklistNode(NodeId nodeId) { - LOG.info("DEBUG: BlacklistNode not supported"); + LOG.info("BlacklistNode not supported"); } @Override public void unblacklistNode(NodeId nodeId) { - LOG.info("DEBUG: unBlacklistNode not supported"); + LOG.info("unBlacklistNode not supported"); } @Override @@ -195,7 +195,7 @@ public class TezTestServiceTaskSchedulerService extends TaskScheduler { @Override public Object deallocateContainer(ContainerId containerId) { - LOG.info("DEBUG: Ignoring deallocateContainer for containerId: " + containerId); + LOG.info("Ignoring deallocateContainer for containerId: " + containerId); return null; } http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java index f3fc442..472a43c 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java @@ -173,7 +173,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun throw new TezException(e); } } - LOG.info("DEBUG: Dirs are: " + Arrays.toString(localDirs)); + LOG.info("Dirs for {} are {}", request.getContainerIdString(), Arrays.toString(localDirs)); // Setup workingDir. This is otherwise setup as Environment.PWD @@ -193,7 +193,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials); // TODO Unregistering does not happen at the moment, since there's no signals on when an app completes. - LOG.info("DEBUG: Registering request with the ShuffleHandler"); + LOG.info("Registering request with the ShuffleHandler for containerId {}", request.getContainerIdString()); ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken, request.getUser()); @@ -255,7 +255,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials); // TODO Unregistering does not happen at the moment, since there's no signals on when an app completes. - LOG.info("DEBUG: Registering request with the ShuffleHandler"); + LOG.info("Registering request with the ShuffleHandler for containerId {}", request.getContainerIdString()); ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken, request.getUser()); TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(getConfig()), new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs, http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java index 2c52ae3..3701455 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java @@ -130,8 +130,6 @@ public class TestExternalTezServices { confForJobs.set(entry.getKey(), entry.getValue()); } - // TODO TEZ-2003 Once per vertex configuration is possible, run separate tests for push vs pull (regular threaded execution) - Path stagingDirPath = new Path("/tmp/tez-staging-dir"); remoteFs.mkdirs(stagingDirPath); // This is currently configured to push tasks into the Service, and then use the standard RPC http://git-wip-us.apache.org/repos/asf/tez/blob/0026ebec/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java index 47a61ab..9a5a3ab 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java @@ -26,8 +26,6 @@ import org.apache.tez.runtime.task.ErrorReporter; public interface TaskReporterInterface { - // TODO TEZ-2003 Consolidate private API usage if making this public - void registerTask(RuntimeTask task, ErrorReporter errorReporter); void unregisterTask(TezTaskAttemptID taskAttemptId); @@ -43,4 +41,4 @@ public interface TaskReporterInterface { void shutdown(); -} +} \ No newline at end of file
