TEZ-2708. Rename classes and variables post TEZ-2003 changes. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8b278ea8 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8b278ea8 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8b278ea8 Branch: refs/heads/master Commit: 8b278ea84f4c64e7144c07fa79b38a6a719541d2 Parents: dc0ee01 Author: Siddharth Seth <[email protected]> Authored: Tue Aug 25 16:48:00 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Tue Aug 25 16:48:00 2015 -0700 ---------------------------------------------------------------------- tez-dag/findbugs-exclude.xml | 15 +- .../java/org/apache/tez/dag/app/AppContext.java | 4 +- .../dag/app/ContainerLauncherContextImpl.java | 4 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 113 +-- .../apache/tez/dag/app/TaskAttemptListener.java | 46 -- .../dag/app/TaskAttemptListenerImpTezDag.java | 449 ----------- .../dag/app/TaskCommunicatorContextImpl.java | 22 +- .../tez/dag/app/TaskCommunicatorManager.java | 449 +++++++++++ .../app/TaskCommunicatorManagerInterface.java | 46 ++ .../apache/tez/dag/app/dag/impl/DAGImpl.java | 10 +- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 8 +- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 10 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 10 +- .../dag/app/launcher/ContainerLauncherImpl.java | 410 ---------- .../app/launcher/ContainerLauncherManager.java | 217 +++++ .../app/launcher/ContainerLauncherRouter.java | 215 ----- .../app/launcher/LocalContainerLauncher.java | 8 +- .../app/launcher/TezContainerLauncherImpl.java | 410 ++++++++++ .../tez/dag/app/rm/ContainerLauncherEvent.java | 116 +++ .../dag/app/rm/ContainerLauncherEventType.java | 25 + .../rm/ContainerLauncherLaunchRequestEvent.java | 79 ++ .../rm/ContainerLauncherStopRequestEvent.java | 34 + .../tez/dag/app/rm/NMCommunicatorEvent.java | 115 --- .../tez/dag/app/rm/NMCommunicatorEventType.java | 25 - .../rm/NMCommunicatorLaunchRequestEvent.java | 78 -- .../app/rm/NMCommunicatorStopRequestEvent.java | 33 - .../dag/app/rm/TaskSchedulerContextImpl.java | 29 +- .../dag/app/rm/TaskSchedulerEventHandler.java | 784 ------------------ .../tez/dag/app/rm/TaskSchedulerManager.java | 786 +++++++++++++++++++ .../dag/app/rm/container/AMContainerImpl.java | 26 +- .../dag/app/rm/container/AMContainerMap.java | 6 +- .../apache/tez/dag/app/MockDAGAppMaster.java | 23 +- .../tez/dag/app/TestMockDAGAppMaster.java | 2 +- .../app/TestTaskAttemptListenerImplTezDag.java | 426 ---------- .../app/TestTaskAttemptListenerImplTezDag2.java | 137 ---- .../app/TestTaskCommunicatorContextImpl.java | 2 +- .../dag/app/TestTaskCommunicatorManager.java | 2 +- .../dag/app/TestTaskCommunicatorManager1.java | 425 ++++++++++ .../dag/app/TestTaskCommunicatorManager2.java | 136 ++++ .../apache/tez/dag/app/dag/impl/TestCommit.java | 6 +- .../tez/dag/app/dag/impl/TestDAGImpl.java | 12 +- .../tez/dag/app/dag/impl/TestDAGRecovery.java | 4 +- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 58 +- .../app/dag/impl/TestTaskAttemptRecovery.java | 4 +- .../tez/dag/app/dag/impl/TestTaskImpl.java | 16 +- .../tez/dag/app/dag/impl/TestTaskRecovery.java | 4 +- .../tez/dag/app/dag/impl/TestVertexImpl.java | 31 +- .../tez/dag/app/dag/impl/TestVertexImpl2.java | 4 +- .../dag/app/dag/impl/TestVertexRecovery.java | 10 +- .../launcher/TestContainerLauncherManager.java | 359 +++++++++ .../launcher/TestContainerLauncherRouter.java | 359 --------- .../tez/dag/app/rm/TestContainerReuse.java | 310 ++++---- .../app/rm/TestTaskSchedulerEventHandler.java | 707 ----------------- .../dag/app/rm/TestTaskSchedulerHelpers.java | 14 +- .../dag/app/rm/TestTaskSchedulerManager.java | 708 +++++++++++++++++ .../dag/app/rm/container/TestAMContainer.java | 24 +- .../app/rm/container/TestAMContainerMap.java | 6 +- .../tez/dag/app/rm/node/TestAMNodeTracker.java | 38 +- .../tez/service/impl/ContainerRunnerImpl.java | 23 +- .../apache/tez/runtime/task/TezTaskRunner.java | 451 ----------- .../tez/runtime/task/TestTaskExecution.java | 362 --------- .../tez/runtime/task/TestTaskExecution2.java | 2 +- 62 files changed, 4220 insertions(+), 5027 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml index 9d15035..6db3b7c 100644 --- a/tez-dag/findbugs-exclude.xml +++ b/tez-dag/findbugs-exclude.xml @@ -87,13 +87,13 @@ </Match> <Match> - <Class name="~org\.apache\.tez\.dag\.app\.rm\.TaskSchedulerEventHandler"/> + <Class name="~org\.apache\.tez\.dag\.app\.rm\.TaskSchedulerManager"/> <Bug pattern="BC_UNCONFIRMED_CAST"/> </Match> <Match> - <Class name="org.apache.tez.dag.app.launcher.ContainerLauncherRouter" /> - <Method name="handle" params="org.apache.tez.dag.app.rm.NMCommunicatorEvent" returns="void" /> + <Class name="org.apache.tez.dag.app.launcher.ContainerLauncherManager" /> + <Method name="handle" params="org.apache.tez.dag.app.rm.ContainerLauncherEvent" returns="void" /> <Bug pattern="BC_UNCONFIRMED_CAST" /> </Match> @@ -151,7 +151,7 @@ <Field name="context"/> <Field name="currentDAG"/> <Field name="state"/> - <Field name="taskSchedulerEventHandler"/> + <Field name="taskSchedulerManager"/> <Field name="versionMismatch"/> <Field name="versionMismatchDiagnostics"/> <Field name="containers"/> @@ -165,13 +165,6 @@ <Bug pattern="IS2_INCONSISTENT_SYNC"/> </Match> - <!-- TEZ-1955 --> - <Match> - <Class name="org.apache.tez.dag.app.rm.TaskSchedulerEventHandler"/> - <Field name="taskScheduler"/> - <Bug pattern="IS2_INCONSISTENT_SYNC"/> - </Match> - <!-- TEZ-1956 --> <Match> <Class name="org.apache.tez.dag.app.rm.YarnTaskSchedulerService"/> http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java index 516fcef..657267b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java @@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.dag.app.dag.DAG; -import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler; +import org.apache.tez.dag.app.rm.TaskSchedulerManager; import org.apache.tez.dag.app.rm.container.AMContainerMap; import org.apache.tez.dag.app.rm.node.AMNodeTracker; import org.apache.tez.common.security.ACLManager; @@ -88,7 +88,7 @@ public interface AppContext { AMNodeTracker getNodeTracker(); - TaskSchedulerEventHandler getTaskScheduler(); + TaskSchedulerManager getTaskScheduler(); boolean isSession(); http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java index 3a2efc5..20bfd13 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java @@ -33,10 +33,10 @@ import org.apache.tez.dag.history.events.ContainerLaunchedEvent; public class ContainerLauncherContextImpl implements ContainerLauncherContext { private final AppContext context; - private final TaskAttemptListener tal; + private final TaskCommunicatorManagerInterface tal; private final UserPayload initialUserPayload; - public ContainerLauncherContextImpl(AppContext appContext, TaskAttemptListener tal, UserPayload initialUserPayload) { + public ContainerLauncherContextImpl(AppContext appContext, TaskCommunicatorManagerInterface tal, UserPayload initialUserPayload) { this.context = appContext; this.tal = tal; this.initialUserPayload = initialUserPayload; http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 84b3095..34e7c2a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -40,7 +40,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -149,10 +148,10 @@ import org.apache.tez.dag.app.dag.event.TaskEventType; import org.apache.tez.dag.app.dag.event.VertexEvent; import org.apache.tez.dag.app.dag.event.VertexEventType; import org.apache.tez.dag.app.dag.impl.DAGImpl; -import org.apache.tez.dag.app.launcher.ContainerLauncherRouter; +import org.apache.tez.dag.app.launcher.ContainerLauncherManager; import org.apache.tez.dag.app.rm.AMSchedulerEventType; -import org.apache.tez.dag.app.rm.NMCommunicatorEventType; -import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler; +import org.apache.tez.dag.app.rm.ContainerLauncherEventType; +import org.apache.tez.dag.app.rm.TaskSchedulerManager; import org.apache.tez.dag.app.rm.container.AMContainerEventType; import org.apache.tez.dag.app.rm.container.AMContainerMap; import org.apache.tez.dag.app.rm.container.ContainerContextMatcher; @@ -236,16 +235,16 @@ public class DAGAppMaster extends AbstractService { private AppContext context; private Configuration amConf; private AsyncDispatcher dispatcher; - private ContainerLauncherRouter containerLauncherRouter; + private ContainerLauncherManager containerLauncherManager; private ContainerHeartbeatHandler containerHeartbeatHandler; private TaskHeartbeatHandler taskHeartbeatHandler; - private TaskAttemptListener taskAttemptListener; + private TaskCommunicatorManagerInterface taskCommunicatorManager; private JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(); private Token<JobTokenIdentifier> sessionToken; private DagEventDispatcher dagEventDispatcher; private VertexEventDispatcher vertexEventDispatcher; - private TaskSchedulerEventHandler taskSchedulerEventHandler; + private TaskSchedulerManager taskSchedulerManager; private WebUIService webUIService; private HistoryEventHandler historyEventHandler; private final Map<String, LocalResource> amResources = new HashMap<String, LocalResource>(); @@ -472,13 +471,13 @@ public class DAGAppMaster extends AbstractService { //service to handle requests to TaskUmbilicalProtocol - taskAttemptListener = createTaskAttemptListener(context, + taskCommunicatorManager = createTaskCommunicatorManager(context, taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorDescriptors); - addIfService(taskAttemptListener, true); + addIfService(taskCommunicatorManager, true); containerSignatureMatcher = createContainerSignatureMatcher(); containers = new AMContainerMap(containerHeartbeatHandler, - taskAttemptListener, containerSignatureMatcher, context); + taskCommunicatorManager, containerSignatureMatcher, context); addIfService(containers, true); dispatcher.register(AMContainerEventType.class, containers); @@ -521,29 +520,30 @@ public class DAGAppMaster extends AbstractService { - this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context, + this.taskSchedulerManager = new TaskSchedulerManager(context, clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService, taskSchedulerDescriptors, isLocal); - addIfService(taskSchedulerEventHandler, true); + addIfService(taskSchedulerManager, true); if (enableWebUIService()) { - addIfServiceDependency(taskSchedulerEventHandler, webUIService); + addIfServiceDependency(taskSchedulerManager, webUIService); } if (isLastAMRetry) { LOG.info("AM will unregister as this is the last attempt" + ", currentAttempt=" + appAttemptID.getAttemptId() + ", maxAttempts=" + maxAppAttempts); - this.taskSchedulerEventHandler.setShouldUnregisterFlag(); + this.taskSchedulerManager.setShouldUnregisterFlag(); } dispatcher.register(AMSchedulerEventType.class, - taskSchedulerEventHandler); - addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer); + taskSchedulerManager); + addIfServiceDependency(taskSchedulerManager, clientRpcServer); - this.containerLauncherRouter = createContainerLauncherRouter(containerLauncherDescriptors, isLocal); - addIfService(containerLauncherRouter, true); - dispatcher.register(NMCommunicatorEventType.class, containerLauncherRouter); + this.containerLauncherManager = createContainerLauncherManager(containerLauncherDescriptors, + isLocal); + addIfService(containerLauncherManager, true); + dispatcher.register(ContainerLauncherEventType.class, containerLauncherManager); historyEventHandler = createHistoryEventHandler(context); addIfService(historyEventHandler, true); @@ -632,8 +632,8 @@ public class DAGAppMaster extends AbstractService { } @VisibleForTesting - protected TaskSchedulerEventHandler getTaskSchedulerEventHandler() { - return taskSchedulerEventHandler; + protected TaskSchedulerManager getTaskSchedulerManager() { + return taskSchedulerManager; } @VisibleForTesting @@ -661,7 +661,7 @@ public class DAGAppMaster extends AbstractService { sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.INTERNAL_ERROR)); } else { LOG.info("Internal Error. Finishing directly as no dag is active."); - this.taskSchedulerEventHandler.setShouldUnregisterFlag(); + this.taskSchedulerManager.setShouldUnregisterFlag(); shutdownHandler.shutdown(); } break; @@ -673,7 +673,7 @@ public class DAGAppMaster extends AbstractService { System.out.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId().toString()); if (!isSession) { LOG.info("Not a session, AM will unregister as DAG has completed"); - this.taskSchedulerEventHandler.setShouldUnregisterFlag(); + this.taskSchedulerManager.setShouldUnregisterFlag(); _updateLoggers(currentDAG, "_post"); setStateOnDAGCompletion(); LOG.info("Shutting down on completion of dag:" + @@ -722,7 +722,7 @@ public class DAGAppMaster extends AbstractService { + finishEvt.getDAGState() + ". Error. Shutting down."); state = DAGAppMasterState.ERROR; - this.taskSchedulerEventHandler.setShouldUnregisterFlag(); + this.taskSchedulerManager.setShouldUnregisterFlag(); shutdownHandler.shutdown(); break; } @@ -741,11 +741,11 @@ public class DAGAppMaster extends AbstractService { // Leaving the taskSchedulerEventHandler here for now. Doesn't generate new events. // However, eventually it needs to be moved out. - this.taskSchedulerEventHandler.dagCompleted(); + this.taskSchedulerManager.dagCompleted(); state = DAGAppMasterState.IDLE; } else { LOG.info("Session shutting down now."); - this.taskSchedulerEventHandler.setShouldUnregisterFlag(); + this.taskSchedulerManager.setShouldUnregisterFlag(); if (this.historyEventHandler.hasRecoveryFailed()) { state = DAGAppMasterState.FAILED; } else { @@ -772,8 +772,8 @@ public class DAGAppMaster extends AbstractService { DAGAppMasterEventDagCleanup cleanupEvent = (DAGAppMasterEventDagCleanup) event; LOG.info("Cleaning up DAG: name=" + cleanupEvent.getDag().getName() + ", with id=" + cleanupEvent.getDag().getID()); - containerLauncherRouter.dagComplete(cleanupEvent.getDag()); - taskAttemptListener.dagComplete(cleanupEvent.getDag()); + containerLauncherManager.dagComplete(cleanupEvent.getDag()); + taskCommunicatorManager.dagComplete(cleanupEvent.getDag()); nodes.dagComplete(cleanupEvent.getDag()); containers.dagComplete(cleanupEvent.getDag()); TezTaskAttemptID.clearCache(); @@ -785,9 +785,9 @@ public class DAGAppMaster extends AbstractService { break; case NEW_DAG_SUBMITTED: // Inform sub-components that a new DAG has been submitted. - taskSchedulerEventHandler.dagSubmitted(); - containerLauncherRouter.dagSubmitted(); - taskAttemptListener.dagSubmitted(); + taskSchedulerManager.dagSubmitted(); + containerLauncherManager.dagSubmitted(); + taskCommunicatorManager.dagSubmitted(); break; default: throw new TezUncheckedException( @@ -917,7 +917,7 @@ public class DAGAppMaster extends AbstractService { // create single dag DAGImpl newDag = new DAGImpl(dagId, amConf, dagPB, dispatcher.getEventHandler(), - taskAttemptListener, dagCredentials, clock, + taskCommunicatorManager, dagCredentials, clock, appMasterUgi.getShortUserName(), taskHeartbeatHandler, context); @@ -1048,13 +1048,13 @@ public class DAGAppMaster extends AbstractService { } } - protected TaskAttemptListener createTaskAttemptListener(AppContext context, - TaskHeartbeatHandler thh, - ContainerHeartbeatHandler chh, - List<NamedEntityDescriptor> entityDescriptors) { - TaskAttemptListener lis = - new TaskAttemptListenerImpTezDag(context, thh, chh, entityDescriptors); - return lis; + protected TaskCommunicatorManagerInterface createTaskCommunicatorManager(AppContext context, + TaskHeartbeatHandler thh, + ContainerHeartbeatHandler chh, + List<NamedEntityDescriptor> entityDescriptors) { + TaskCommunicatorManagerInterface tcm = + new TaskCommunicatorManager(context, thh, chh, entityDescriptors); + return tcm; } protected TaskHeartbeatHandler createTaskHeartbeatHandler(AppContext context, @@ -1074,10 +1074,11 @@ public class DAGAppMaster extends AbstractService { return chh; } - protected ContainerLauncherRouter createContainerLauncherRouter(List<NamedEntityDescriptor> containerLauncherDescriptors, - boolean isLocal) throws + protected ContainerLauncherManager createContainerLauncherManager( + List<NamedEntityDescriptor> containerLauncherDescriptors, + boolean isLocal) throws UnknownHostException { - return new ContainerLauncherRouter(context, taskAttemptListener, workingDirectory, + return new ContainerLauncherManager(context, taskCommunicatorManager, workingDirectory, containerLauncherDescriptors, isLocal); } @@ -1101,12 +1102,12 @@ public class DAGAppMaster extends AbstractService { return dispatcher; } - public ContainerLauncherRouter getContainerLauncherRouter() { - return containerLauncherRouter; + public ContainerLauncherManager getContainerLauncherManager() { + return containerLauncherManager; } - public TaskAttemptListener getTaskAttemptListener() { - return taskAttemptListener; + public TaskCommunicatorManagerInterface getTaskCommunicatorManager() { + return taskCommunicatorManager; } public ContainerId getAppContainerId() { @@ -1213,7 +1214,7 @@ public class DAGAppMaster extends AbstractService { public void shutdownTezAM() throws TezException { sessionStopped.set(true); synchronized (this) { - this.taskSchedulerEventHandler.setShouldUnregisterFlag(); + this.taskSchedulerManager.setShouldUnregisterFlag(); if (currentDAG != null && !currentDAG.isComplete()) { //send a DAG_KILL message @@ -1473,8 +1474,8 @@ public class DAGAppMaster extends AbstractService { } @Override - public TaskSchedulerEventHandler getTaskScheduler() { - return taskSchedulerEventHandler; + public TaskSchedulerManager getTaskScheduler() { + return taskSchedulerManager; } @Override @@ -1574,7 +1575,7 @@ public class DAGAppMaster extends AbstractService { throw new TezUncheckedException( "Cannot get ApplicationACLs before all services have started"); } - return taskSchedulerEventHandler.getApplicationAcls(); + return taskSchedulerManager.getApplicationAcls(); } @Override @@ -1805,7 +1806,7 @@ public class DAGAppMaster extends AbstractService { if (versionMismatch) { // Short-circuit and return as no DAG should not be run - this.taskSchedulerEventHandler.setShouldUnregisterFlag(); + this.taskSchedulerManager.setShouldUnregisterFlag(); shutdownHandler.shutdown(); return; } @@ -1825,7 +1826,7 @@ public class DAGAppMaster extends AbstractService { LOG.error("Error occurred when trying to recover data from previous attempt." + " Shutting down AM", e); this.state = DAGAppMasterState.ERROR; - this.taskSchedulerEventHandler.setShouldUnregisterFlag(); + this.taskSchedulerManager.setShouldUnregisterFlag(); shutdownHandler.shutdown(); return; } @@ -1929,7 +1930,7 @@ public class DAGAppMaster extends AbstractService { private void initiateStop() { - taskSchedulerEventHandler.initiateStop(); + taskSchedulerManager.initiateStop(); } @Override @@ -1954,8 +1955,8 @@ public class DAGAppMaster extends AbstractService { LOG.debug("Checking whether tez scratch data dir should be deleted, deleteTezScratchData=" + deleteTezScratchData); } - if (deleteTezScratchData && this.taskSchedulerEventHandler != null - && this.taskSchedulerEventHandler.hasUnregistered()) { + if (deleteTezScratchData && this.taskSchedulerManager != null + && this.taskSchedulerManager.hasUnregistered()) { // Delete tez scratch data dir if (this.tezSystemStagingDir != null) { try { @@ -2208,7 +2209,7 @@ public class DAGAppMaster extends AbstractService { // Notify TaskScheduler that a SIGTERM has been received so that it // unregisters quickly with proper status LOG.info("DAGAppMaster received a signal. Signaling TaskScheduler"); - appMaster.taskSchedulerEventHandler.setSignalled(true); + appMaster.taskSchedulerManager.setSignalled(true); } if (EnumSet.of(DAGAppMasterState.NEW, DAGAppMasterState.INITED, http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java deleted file mode 100644 index 761bdb0..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java +++ /dev/null @@ -1,46 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.dag.app; - -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.tez.serviceplugins.api.ContainerEndReason; -import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; -import org.apache.tez.dag.app.dag.DAG; -import org.apache.tez.dag.api.TaskCommunicator; -import org.apache.tez.dag.app.rm.container.AMContainerTask; -import org.apache.tez.dag.records.TezTaskAttemptID; -/** - * This class listens for changes to the state of a Task. - */ -public interface TaskAttemptListener { - - void registerRunningContainer(ContainerId containerId, int taskCommId); - - void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId); - - void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics); - - void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId, TaskAttemptEndReason endReason, String diagnostics); - - void dagComplete(DAG dag); - - void dagSubmitted(); - - TaskCommunicator getTaskCommunicator(int taskCommIndex); -} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java deleted file mode 100644 index 185193f..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ /dev/null @@ -1,449 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.tez.dag.app; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.commons.collections4.ListUtils; -import org.apache.tez.dag.api.NamedEntityDescriptor; -import org.apache.tez.dag.api.TezConstants; -import org.apache.tez.dag.api.UserPayload; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; -import org.apache.tez.serviceplugins.api.ContainerEndReason; -import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; -import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; -import org.apache.tez.runtime.api.impl.EventType; -import org.apache.tez.dag.api.event.VertexStateUpdate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.tez.common.ReflectionUtils; -import org.apache.tez.common.TezUtilsInternal; -import org.apache.tez.dag.api.TaskCommunicator; -import org.apache.tez.dag.api.TaskCommunicatorContext; -import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; -import org.apache.tez.dag.api.TaskHeartbeatResponse; -import org.apache.tez.dag.api.TezException; -import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.tez.dag.api.TaskHeartbeatRequest; -import org.apache.tez.dag.app.dag.DAG; -import org.apache.tez.dag.app.dag.Task; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; -import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; -import org.apache.tez.dag.app.rm.container.AMContainerTask; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.runtime.api.impl.TezEvent; - - -@SuppressWarnings("unchecked") [email protected] -public class TaskAttemptListenerImpTezDag extends AbstractService implements - TaskAttemptListener { - - private static final Logger LOG = LoggerFactory - .getLogger(TaskAttemptListenerImpTezDag.class); - - private final AppContext context; - private final TaskCommunicator[] taskCommunicators; - private final TaskCommunicatorContext[] taskCommunicatorContexts; - protected final ServicePluginLifecycleAbstractService []taskCommunicatorServiceWrappers; - - protected final TaskHeartbeatHandler taskHeartbeatHandler; - protected final ContainerHeartbeatHandler containerHeartbeatHandler; - - private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0, 0); - - private final ConcurrentMap<TezTaskAttemptID, ContainerId> registeredAttempts = - new ConcurrentHashMap<TezTaskAttemptID, ContainerId>(); - private final ConcurrentMap<ContainerId, ContainerInfo> registeredContainers = - new ConcurrentHashMap<ContainerId, ContainerInfo>(); - - // Defined primarily to work around ConcurrentMaps not accepting null values - private static final class ContainerInfo { - TezTaskAttemptID taskAttemptId; - ContainerInfo(TezTaskAttemptID taskAttemptId) { - this.taskAttemptId = taskAttemptId; - } - } - - private static final ContainerInfo NULL_CONTAINER_INFO = new ContainerInfo(null); - - - public TaskAttemptListenerImpTezDag(AppContext context, - TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, - List<NamedEntityDescriptor> taskCommunicatorDescriptors) { - super(TaskAttemptListenerImpTezDag.class.getName()); - this.context = context; - this.taskHeartbeatHandler = thh; - this.containerHeartbeatHandler = chh; - Preconditions.checkArgument( - taskCommunicatorDescriptors != null && !taskCommunicatorDescriptors.isEmpty(), - "TaskCommunicators must be specified"); - this.taskCommunicators = new TaskCommunicator[taskCommunicatorDescriptors.size()]; - this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorDescriptors.size()]; - this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorDescriptors.size()]; - for (int i = 0 ; i < taskCommunicatorDescriptors.size() ; i++) { - UserPayload userPayload = taskCommunicatorDescriptors.get(i).getUserPayload(); - taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, userPayload, i); - taskCommunicators[i] = createTaskCommunicator(taskCommunicatorDescriptors.get(i), i); - taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskCommunicators[i]); - } - // TODO TEZ-2118 Start using taskCommunicator indices properly - } - - @Override - public void serviceStart() { - // TODO Why is init tied to serviceStart - for (int i = 0 ; i < taskCommunicators.length ; i++) { - taskCommunicatorServiceWrappers[i].init(getConfig()); - taskCommunicatorServiceWrappers[i].start(); - } - } - - @Override - public void serviceStop() { - for (int i = 0 ; i < taskCommunicators.length ; i++) { - taskCommunicatorServiceWrappers[i].stop(); - } - } - - @VisibleForTesting - TaskCommunicator createTaskCommunicator(NamedEntityDescriptor taskCommDescriptor, - int taskCommIndex) { - if (taskCommDescriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName())) { - return createDefaultTaskCommunicator(taskCommunicatorContexts[taskCommIndex]); - } else if (taskCommDescriptor.getEntityName() - .equals(TezConstants.getTezUberServicePluginName())) { - return createUberTaskCommunicator(taskCommunicatorContexts[taskCommIndex]); - } else { - return createCustomTaskCommunicator(taskCommunicatorContexts[taskCommIndex], - taskCommDescriptor); - } - } - - @VisibleForTesting - TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) { - LOG.info("Using Default Task Communicator"); - return new TezTaskCommunicatorImpl(taskCommunicatorContext); - } - - @VisibleForTesting - TaskCommunicator createUberTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) { - LOG.info("Using Default Local Task Communicator"); - return new TezLocalTaskCommunicatorImpl(taskCommunicatorContext); - } - - @VisibleForTesting - TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext, - NamedEntityDescriptor taskCommDescriptor) { - LOG.info("Using TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(), - taskCommDescriptor.getClassName()); - Class<? extends TaskCommunicator> taskCommClazz = - (Class<? extends TaskCommunicator>) ReflectionUtils - .getClazz(taskCommDescriptor.getClassName()); - try { - Constructor<? extends TaskCommunicator> ctor = - taskCommClazz.getConstructor(TaskCommunicatorContext.class); - return ctor.newInstance(taskCommunicatorContext); - } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { - throw new TezUncheckedException(e); - } - } - - public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) - throws IOException, TezException { - ContainerId containerId = ConverterUtils.toContainerId(request - .getContainerIdentifier()); - if (LOG.isDebugEnabled()) { - LOG.debug("Received heartbeat from container" - + ", request=" + request); - } - - if (!registeredContainers.containsKey(containerId)) { - LOG.warn("Received task heartbeat from unknown container with id: " + containerId + - ", asking it to die"); - return RESPONSE_SHOULD_DIE; - } - - // A heartbeat can come in anytime. The AM may have made a decision to kill a running task/container - // meanwhile. If the decision is processed through the pipeline before the heartbeat is processed, - // the heartbeat will be dropped. Otherwise the heartbeat will be processed - and the system - // know how to handle this - via FailedInputEvents for example (relevant only if the heartbeat has events). - // So - avoiding synchronization. - - pingContainerHeartbeatHandler(containerId); - TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null, 0); - TezTaskAttemptID taskAttemptID = request.getTaskAttemptId(); - if (taskAttemptID != null) { - ContainerId containerIdFromMap = registeredAttempts.get(taskAttemptID); - if (containerIdFromMap == null || !containerIdFromMap.equals(containerId)) { - // This can happen when a task heartbeats. Meanwhile the container is unregistered. - // The information will eventually make it through to the plugin via a corresponding unregister. - // There's a race in that case between the unregister making it through, and this method returning. - // TODO TEZ-2003 (post) TEZ-2666. An exception back is likely a better approach than sending a shouldDie = true, - // so that the plugin can handle the scenario. Alternately augment the response with error codes. - // Error codes would be better than exceptions. - LOG.info("Attempt: " + taskAttemptID + " is not recognized for heartbeats"); - return RESPONSE_SHOULD_DIE; - } - - List<TezEvent> inEvents = request.getEvents(); - if (LOG.isDebugEnabled()) { - LOG.debug("Ping from " + taskAttemptID.toString() + - " events: " + (inEvents != null ? inEvents.size() : -1)); - } - - long currTime = context.getClock().getTime(); - List<TezEvent> otherEvents = new ArrayList<TezEvent>(); - // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events - // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT) - // to VertexImpl to ensure the events ordering - // 1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent - // 2. TaskStatusEvent is handled before TaskAttemptFinishedEvent - for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { - // for now, set the event time on the AM when it is received. - // this avoids any time disparity between machines. - tezEvent.setEventReceivedTime(currTime); - final EventType eventType = tezEvent.getEventType(); - if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) { - TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID, - (TaskStatusUpdateEvent) tezEvent.getEvent()); - context.getEventHandler().handle(taskAttemptEvent); - } else { - otherEvents.add(tezEvent); - } - } - if(!otherEvents.isEmpty()) { - TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID(); - context.getEventHandler().handle( - new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(otherEvents))); - } - taskHeartbeatHandler.pinged(taskAttemptID); - eventInfo = context - .getCurrentDAG() - .getVertex(taskAttemptID.getTaskID().getVertexID()) - .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(), - request.getMaxEvents()); - } - return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId(), eventInfo.getNextPreRoutedFromEventId()); - } - public void taskAlive(TezTaskAttemptID taskAttemptId) { - taskHeartbeatHandler.pinged(taskAttemptId); - } - - public void containerAlive(ContainerId containerId) { - pingContainerHeartbeatHandler(containerId); - } - - public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) { - context.getEventHandler() - .handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null)); - pingContainerHeartbeatHandler(containerId); - } - - public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, - String diagnostics) { - // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler, - // and messages from the scheduler will release the container. - // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore, - // instead of waiting for the unregister to flow through the Container. - // Fix along the same lines as TEZ-2124 by introducing an explict context. - context.getEventHandler().handle(new TaskAttemptEventAttemptKilled(taskAttemptId, - diagnostics, TezUtilsInternal.fromTaskAttemptEndReason( - taskAttemptEndReason))); - } - - public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, - String diagnostics) { - // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler, - // and messages from the scheduler will release the container. - // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore, - // instead of waiting for the unregister to flow through the Container. - // Fix along the same lines as TEZ-2124 by introducing an explict context. - context.getEventHandler().handle(new TaskAttemptEventAttemptFailed(taskAttemptId, - TaskAttemptEventType.TA_FAILED, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason( - taskAttemptEndReason))); - } - - public void vertexStateUpdateNotificationReceived(VertexStateUpdate event, int taskCommIndex) throws - Exception { - taskCommunicators[taskCommIndex].onVertexStateUpdated(event); - } - - - /** - * Child checking whether it can commit. - * <p/> - * <br/> - * Repeatedly polls the ApplicationMaster whether it - * {@link Task#canCommit(TezTaskAttemptID)} This is * a legacy from the - * centralized commit protocol handling by the JobTracker. - */ -// @Override - public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException { - LOG.info("Commit go/no-go request from " + taskAttemptId.toString()); - // An attempt is asking if it can commit its output. This can be decided - // only by the task which is managing the multiple attempts. So redirect the - // request there. - taskHeartbeatHandler.progressing(taskAttemptId); - pingContainerHeartbeatHandler(taskAttemptId); - - DAG job = context.getCurrentDAG(); - Task task = - job.getVertex(taskAttemptId.getTaskID().getVertexID()). - getTask(taskAttemptId.getTaskID()); - return task.canCommit(taskAttemptId); - } - - // The TaskAttemptListener register / unregister methods in this class are not thread safe. - // The Tez framework should not invoke these methods from multiple threads. - @Override - public void dagComplete(DAG dag) { - // TODO TEZ-2335. Cleanup TaskHeartbeat handler structures. - // TODO TEZ-2345. Also cleanup attemptInfo map, so that any tasks which heartbeat are told to die. - // Container structures remain unchanged - since they could be re-used across restarts. - // This becomes more relevant when task kills without container kills are allowed. - - // TODO TEZ-2336. Send a signal to containers indicating DAG completion. - - // Inform all communicators of the dagCompletion. - for (int i = 0 ; i < taskCommunicators.length ; i++) { - ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteStart(dag); - taskCommunicators[i].dagComplete(dag.getName()); - ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteEnd(); - } - - } - - @Override - public void dagSubmitted() { - // Nothing to do right now. Indicates that a new DAG has been submitted and - // the context has updated information. - } - - @Override - public void registerRunningContainer(ContainerId containerId, int taskCommId) { - if (LOG.isDebugEnabled()) { - LOG.debug("ContainerId: " + containerId + " registered with TaskAttemptListener"); - } - ContainerInfo oldInfo = registeredContainers.put(containerId, NULL_CONTAINER_INFO); - if (oldInfo != null) { - throw new TezUncheckedException( - "Multiple registrations for containerId: " + containerId); - } - NodeId nodeId = context.getAllContainers().get(containerId).getContainer().getNodeId(); - taskCommunicators[taskCommId].registerRunningContainer(containerId, nodeId.getHost(), - nodeId.getPort()); - } - - @Override - public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics) { - if (LOG.isDebugEnabled()) { - LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId); - } - ContainerInfo containerInfo = registeredContainers.remove(containerId); - if (containerInfo.taskAttemptId != null) { - registeredAttempts.remove(containerInfo.taskAttemptId); - } - taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason, diagnostics); - } - - @Override - public void registerTaskAttempt(AMContainerTask amContainerTask, - ContainerId containerId, int taskCommId) { - ContainerInfo containerInfo = registeredContainers.get(containerId); - if (containerInfo == null) { - throw new TezUncheckedException("Registering task attempt: " - + amContainerTask.getTask().getTaskAttemptID() + " to unknown container: " + containerId); - } - if (containerInfo.taskAttemptId != null) { - throw new TezUncheckedException("Registering task attempt: " - + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId - + " with existing assignment to: " + - containerInfo.taskAttemptId); - } - - // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map. - registeredContainers.put(containerId, new ContainerInfo(amContainerTask.getTask().getTaskAttemptID())); - - ContainerId containerIdFromMap = registeredAttempts.put( - amContainerTask.getTask().getTaskAttemptID(), containerId); - if (containerIdFromMap != null) { - throw new TezUncheckedException("Registering task attempt: " - + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId - + " when already assigned to: " + containerIdFromMap); - } - taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(), - amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(), - amContainerTask.haveCredentialsChanged(), amContainerTask.getPriority()); - } - - @Override - public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId, TaskAttemptEndReason endReason, String diagnostics) { - ContainerId containerId = registeredAttempts.remove(attemptId); - if (containerId == null) { - LOG.warn("Unregister task attempt: " + attemptId + " from unknown container"); - return; - } - ContainerInfo containerInfo = registeredContainers.get(containerId); - if (containerInfo == null) { - LOG.warn("Unregister task attempt: " + attemptId + - " from non-registered container: " + containerId); - return; - } - // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map. - registeredContainers.put(containerId, NULL_CONTAINER_INFO); - taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason, diagnostics); - } - - @Override - public TaskCommunicator getTaskCommunicator(int taskCommIndex) { - return taskCommunicators[taskCommIndex]; - } - - private void pingContainerHeartbeatHandler(ContainerId containerId) { - containerHeartbeatHandler.pinged(containerId); - } - - private void pingContainerHeartbeatHandler(TezTaskAttemptID taskAttemptId) { - ContainerId containerId = registeredAttempts.get(taskAttemptId); - if (containerId != null) { - containerHeartbeatHandler.pinged(containerId); - } else { - LOG.warn("Handling communication from attempt: " + taskAttemptId - + ", ContainerId not known for this attempt"); - } - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java index 9d57ac3..071b008 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java @@ -47,7 +47,7 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver // TODO TEZ-2003 (post) TEZ-2669 Propagate errors baack to the AM with proper error reporting private final AppContext context; - private final TaskAttemptListenerImpTezDag taskAttemptListener; + private final TaskCommunicatorManager taskCommunicatorManager; private final int taskCommunicatorIndex; private final ReentrantReadWriteLock.ReadLock dagChangedReadLock; private final ReentrantReadWriteLock.WriteLock dagChangedWriteLock; @@ -56,11 +56,11 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver private DAG dag; public TaskCommunicatorContextImpl(AppContext appContext, - TaskAttemptListenerImpTezDag taskAttemptListener, + TaskCommunicatorManager taskCommunicatorManager, UserPayload userPayload, int taskCommunicatorIndex) { this.context = appContext; - this.taskAttemptListener = taskAttemptListener; + this.taskCommunicatorManager = taskCommunicatorManager; this.userPayload = userPayload; this.taskCommunicatorIndex = taskCommunicatorIndex; @@ -86,13 +86,13 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver @Override public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException { - return taskAttemptListener.canCommit(taskAttemptId); + return taskCommunicatorManager.canCommit(taskAttemptId); } @Override public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException { - return taskAttemptListener.heartbeat(request); + return taskCommunicatorManager.heartbeat(request); } @Override @@ -108,31 +108,31 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver @Override public void taskAlive(TezTaskAttemptID taskAttemptId) { - taskAttemptListener.taskAlive(taskAttemptId); + taskCommunicatorManager.taskAlive(taskAttemptId); } @Override public void containerAlive(ContainerId containerId) { if (isKnownContainer(containerId)) { - taskAttemptListener.containerAlive(containerId); + taskCommunicatorManager.containerAlive(containerId); } } @Override public void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId) { - taskAttemptListener.taskStartedRemotely(taskAttemptId, containerId); + taskCommunicatorManager.taskStartedRemotely(taskAttemptId, containerId); } @Override public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics) { - taskAttemptListener.taskKilled(taskAttemptId, taskAttemptEndReason, diagnostics); + taskCommunicatorManager.taskKilled(taskAttemptId, taskAttemptEndReason, diagnostics); } @Override public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics) { - taskAttemptListener.taskFailed(taskAttemptId, taskAttemptEndReason, diagnostics); + taskCommunicatorManager.taskFailed(taskAttemptId, taskAttemptEndReason, diagnostics); } @@ -196,7 +196,7 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver @Override public void onStateUpdated(VertexStateUpdate event) { try { - taskAttemptListener.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex); + taskCommunicatorManager.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex); } catch (Exception e) { throw new TezUncheckedException(e); } http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java new file mode 100644 index 0000000..42df259 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java @@ -0,0 +1,449 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.tez.dag.app; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.commons.collections4.ListUtils; +import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; +import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; +import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; +import org.apache.tez.runtime.api.impl.EventType; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.dag.api.TaskCommunicator; +import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; +import org.apache.tez.dag.api.TaskHeartbeatResponse; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.tez.dag.api.TaskHeartbeatRequest; +import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.dag.app.dag.Task; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; +import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; +import org.apache.tez.dag.app.rm.container.AMContainerTask; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.impl.TezEvent; + + +@SuppressWarnings("unchecked") [email protected] +public class TaskCommunicatorManager extends AbstractService implements + TaskCommunicatorManagerInterface { + + private static final Logger LOG = LoggerFactory + .getLogger(TaskCommunicatorManager.class); + + private final AppContext context; + private final TaskCommunicator[] taskCommunicators; + private final TaskCommunicatorContext[] taskCommunicatorContexts; + protected final ServicePluginLifecycleAbstractService []taskCommunicatorServiceWrappers; + + protected final TaskHeartbeatHandler taskHeartbeatHandler; + protected final ContainerHeartbeatHandler containerHeartbeatHandler; + + private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0, 0); + + private final ConcurrentMap<TezTaskAttemptID, ContainerId> registeredAttempts = + new ConcurrentHashMap<TezTaskAttemptID, ContainerId>(); + private final ConcurrentMap<ContainerId, ContainerInfo> registeredContainers = + new ConcurrentHashMap<ContainerId, ContainerInfo>(); + + // Defined primarily to work around ConcurrentMaps not accepting null values + private static final class ContainerInfo { + TezTaskAttemptID taskAttemptId; + ContainerInfo(TezTaskAttemptID taskAttemptId) { + this.taskAttemptId = taskAttemptId; + } + } + + private static final ContainerInfo NULL_CONTAINER_INFO = new ContainerInfo(null); + + + public TaskCommunicatorManager(AppContext context, + TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, + List<NamedEntityDescriptor> taskCommunicatorDescriptors) { + super(TaskCommunicatorManager.class.getName()); + this.context = context; + this.taskHeartbeatHandler = thh; + this.containerHeartbeatHandler = chh; + Preconditions.checkArgument( + taskCommunicatorDescriptors != null && !taskCommunicatorDescriptors.isEmpty(), + "TaskCommunicators must be specified"); + this.taskCommunicators = new TaskCommunicator[taskCommunicatorDescriptors.size()]; + this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorDescriptors.size()]; + this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorDescriptors.size()]; + for (int i = 0 ; i < taskCommunicatorDescriptors.size() ; i++) { + UserPayload userPayload = taskCommunicatorDescriptors.get(i).getUserPayload(); + taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, userPayload, i); + taskCommunicators[i] = createTaskCommunicator(taskCommunicatorDescriptors.get(i), i); + taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskCommunicators[i]); + } + // TODO TEZ-2118 Start using taskCommunicator indices properly + } + + @Override + public void serviceStart() { + // TODO Why is init tied to serviceStart + for (int i = 0 ; i < taskCommunicators.length ; i++) { + taskCommunicatorServiceWrappers[i].init(getConfig()); + taskCommunicatorServiceWrappers[i].start(); + } + } + + @Override + public void serviceStop() { + for (int i = 0 ; i < taskCommunicators.length ; i++) { + taskCommunicatorServiceWrappers[i].stop(); + } + } + + @VisibleForTesting + TaskCommunicator createTaskCommunicator(NamedEntityDescriptor taskCommDescriptor, + int taskCommIndex) { + if (taskCommDescriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName())) { + return createDefaultTaskCommunicator(taskCommunicatorContexts[taskCommIndex]); + } else if (taskCommDescriptor.getEntityName() + .equals(TezConstants.getTezUberServicePluginName())) { + return createUberTaskCommunicator(taskCommunicatorContexts[taskCommIndex]); + } else { + return createCustomTaskCommunicator(taskCommunicatorContexts[taskCommIndex], + taskCommDescriptor); + } + } + + @VisibleForTesting + TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) { + LOG.info("Using Default Task Communicator"); + return new TezTaskCommunicatorImpl(taskCommunicatorContext); + } + + @VisibleForTesting + TaskCommunicator createUberTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) { + LOG.info("Using Default Local Task Communicator"); + return new TezLocalTaskCommunicatorImpl(taskCommunicatorContext); + } + + @VisibleForTesting + TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext, + NamedEntityDescriptor taskCommDescriptor) { + LOG.info("Using TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(), + taskCommDescriptor.getClassName()); + Class<? extends TaskCommunicator> taskCommClazz = + (Class<? extends TaskCommunicator>) ReflectionUtils + .getClazz(taskCommDescriptor.getClassName()); + try { + Constructor<? extends TaskCommunicator> ctor = + taskCommClazz.getConstructor(TaskCommunicatorContext.class); + return ctor.newInstance(taskCommunicatorContext); + } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { + throw new TezUncheckedException(e); + } + } + + public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) + throws IOException, TezException { + ContainerId containerId = ConverterUtils.toContainerId(request + .getContainerIdentifier()); + if (LOG.isDebugEnabled()) { + LOG.debug("Received heartbeat from container" + + ", request=" + request); + } + + if (!registeredContainers.containsKey(containerId)) { + LOG.warn("Received task heartbeat from unknown container with id: " + containerId + + ", asking it to die"); + return RESPONSE_SHOULD_DIE; + } + + // A heartbeat can come in anytime. The AM may have made a decision to kill a running task/container + // meanwhile. If the decision is processed through the pipeline before the heartbeat is processed, + // the heartbeat will be dropped. Otherwise the heartbeat will be processed - and the system + // know how to handle this - via FailedInputEvents for example (relevant only if the heartbeat has events). + // So - avoiding synchronization. + + pingContainerHeartbeatHandler(containerId); + TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null, 0); + TezTaskAttemptID taskAttemptID = request.getTaskAttemptId(); + if (taskAttemptID != null) { + ContainerId containerIdFromMap = registeredAttempts.get(taskAttemptID); + if (containerIdFromMap == null || !containerIdFromMap.equals(containerId)) { + // This can happen when a task heartbeats. Meanwhile the container is unregistered. + // The information will eventually make it through to the plugin via a corresponding unregister. + // There's a race in that case between the unregister making it through, and this method returning. + // TODO TEZ-2003 (post) TEZ-2666. An exception back is likely a better approach than sending a shouldDie = true, + // so that the plugin can handle the scenario. Alternately augment the response with error codes. + // Error codes would be better than exceptions. + LOG.info("Attempt: " + taskAttemptID + " is not recognized for heartbeats"); + return RESPONSE_SHOULD_DIE; + } + + List<TezEvent> inEvents = request.getEvents(); + if (LOG.isDebugEnabled()) { + LOG.debug("Ping from " + taskAttemptID.toString() + + " events: " + (inEvents != null ? inEvents.size() : -1)); + } + + long currTime = context.getClock().getTime(); + List<TezEvent> otherEvents = new ArrayList<TezEvent>(); + // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events + // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT) + // to VertexImpl to ensure the events ordering + // 1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent + // 2. TaskStatusEvent is handled before TaskAttemptFinishedEvent + for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { + // for now, set the event time on the AM when it is received. + // this avoids any time disparity between machines. + tezEvent.setEventReceivedTime(currTime); + final EventType eventType = tezEvent.getEventType(); + if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) { + TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID, + (TaskStatusUpdateEvent) tezEvent.getEvent()); + context.getEventHandler().handle(taskAttemptEvent); + } else { + otherEvents.add(tezEvent); + } + } + if(!otherEvents.isEmpty()) { + TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID(); + context.getEventHandler().handle( + new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(otherEvents))); + } + taskHeartbeatHandler.pinged(taskAttemptID); + eventInfo = context + .getCurrentDAG() + .getVertex(taskAttemptID.getTaskID().getVertexID()) + .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(), + request.getMaxEvents()); + } + return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId(), eventInfo.getNextPreRoutedFromEventId()); + } + public void taskAlive(TezTaskAttemptID taskAttemptId) { + taskHeartbeatHandler.pinged(taskAttemptId); + } + + public void containerAlive(ContainerId containerId) { + pingContainerHeartbeatHandler(containerId); + } + + public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) { + context.getEventHandler() + .handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null)); + pingContainerHeartbeatHandler(containerId); + } + + public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, + String diagnostics) { + // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler, + // and messages from the scheduler will release the container. + // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore, + // instead of waiting for the unregister to flow through the Container. + // Fix along the same lines as TEZ-2124 by introducing an explict context. + context.getEventHandler().handle(new TaskAttemptEventAttemptKilled(taskAttemptId, + diagnostics, TezUtilsInternal.fromTaskAttemptEndReason( + taskAttemptEndReason))); + } + + public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, + String diagnostics) { + // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler, + // and messages from the scheduler will release the container. + // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore, + // instead of waiting for the unregister to flow through the Container. + // Fix along the same lines as TEZ-2124 by introducing an explict context. + context.getEventHandler().handle(new TaskAttemptEventAttemptFailed(taskAttemptId, + TaskAttemptEventType.TA_FAILED, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason( + taskAttemptEndReason))); + } + + public void vertexStateUpdateNotificationReceived(VertexStateUpdate event, int taskCommIndex) throws + Exception { + taskCommunicators[taskCommIndex].onVertexStateUpdated(event); + } + + + /** + * Child checking whether it can commit. + * <p/> + * <br/> + * Repeatedly polls the ApplicationMaster whether it + * {@link Task#canCommit(TezTaskAttemptID)} This is * a legacy from the + * centralized commit protocol handling by the JobTracker. + */ +// @Override + public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException { + LOG.info("Commit go/no-go request from " + taskAttemptId.toString()); + // An attempt is asking if it can commit its output. This can be decided + // only by the task which is managing the multiple attempts. So redirect the + // request there. + taskHeartbeatHandler.progressing(taskAttemptId); + pingContainerHeartbeatHandler(taskAttemptId); + + DAG job = context.getCurrentDAG(); + Task task = + job.getVertex(taskAttemptId.getTaskID().getVertexID()). + getTask(taskAttemptId.getTaskID()); + return task.canCommit(taskAttemptId); + } + + // The TaskAttemptListener register / unregister methods in this class are not thread safe. + // The Tez framework should not invoke these methods from multiple threads. + @Override + public void dagComplete(DAG dag) { + // TODO TEZ-2335. Cleanup TaskHeartbeat handler structures. + // TODO TEZ-2345. Also cleanup attemptInfo map, so that any tasks which heartbeat are told to die. + // Container structures remain unchanged - since they could be re-used across restarts. + // This becomes more relevant when task kills without container kills are allowed. + + // TODO TEZ-2336. Send a signal to containers indicating DAG completion. + + // Inform all communicators of the dagCompletion. + for (int i = 0 ; i < taskCommunicators.length ; i++) { + ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteStart(dag); + taskCommunicators[i].dagComplete(dag.getName()); + ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteEnd(); + } + + } + + @Override + public void dagSubmitted() { + // Nothing to do right now. Indicates that a new DAG has been submitted and + // the context has updated information. + } + + @Override + public void registerRunningContainer(ContainerId containerId, int taskCommId) { + if (LOG.isDebugEnabled()) { + LOG.debug("ContainerId: " + containerId + " registered with TaskAttemptListener"); + } + ContainerInfo oldInfo = registeredContainers.put(containerId, NULL_CONTAINER_INFO); + if (oldInfo != null) { + throw new TezUncheckedException( + "Multiple registrations for containerId: " + containerId); + } + NodeId nodeId = context.getAllContainers().get(containerId).getContainer().getNodeId(); + taskCommunicators[taskCommId].registerRunningContainer(containerId, nodeId.getHost(), + nodeId.getPort()); + } + + @Override + public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics) { + if (LOG.isDebugEnabled()) { + LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId); + } + ContainerInfo containerInfo = registeredContainers.remove(containerId); + if (containerInfo.taskAttemptId != null) { + registeredAttempts.remove(containerInfo.taskAttemptId); + } + taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason, diagnostics); + } + + @Override + public void registerTaskAttempt(AMContainerTask amContainerTask, + ContainerId containerId, int taskCommId) { + ContainerInfo containerInfo = registeredContainers.get(containerId); + if (containerInfo == null) { + throw new TezUncheckedException("Registering task attempt: " + + amContainerTask.getTask().getTaskAttemptID() + " to unknown container: " + containerId); + } + if (containerInfo.taskAttemptId != null) { + throw new TezUncheckedException("Registering task attempt: " + + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId + + " with existing assignment to: " + + containerInfo.taskAttemptId); + } + + // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map. + registeredContainers.put(containerId, new ContainerInfo(amContainerTask.getTask().getTaskAttemptID())); + + ContainerId containerIdFromMap = registeredAttempts.put( + amContainerTask.getTask().getTaskAttemptID(), containerId); + if (containerIdFromMap != null) { + throw new TezUncheckedException("Registering task attempt: " + + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId + + " when already assigned to: " + containerIdFromMap); + } + taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(), + amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(), + amContainerTask.haveCredentialsChanged(), amContainerTask.getPriority()); + } + + @Override + public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId, TaskAttemptEndReason endReason, String diagnostics) { + ContainerId containerId = registeredAttempts.remove(attemptId); + if (containerId == null) { + LOG.warn("Unregister task attempt: " + attemptId + " from unknown container"); + return; + } + ContainerInfo containerInfo = registeredContainers.get(containerId); + if (containerInfo == null) { + LOG.warn("Unregister task attempt: " + attemptId + + " from non-registered container: " + containerId); + return; + } + // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map. + registeredContainers.put(containerId, NULL_CONTAINER_INFO); + taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason, diagnostics); + } + + @Override + public TaskCommunicator getTaskCommunicator(int taskCommIndex) { + return taskCommunicators[taskCommIndex]; + } + + private void pingContainerHeartbeatHandler(ContainerId containerId) { + containerHeartbeatHandler.pinged(containerId); + } + + private void pingContainerHeartbeatHandler(TezTaskAttemptID taskAttemptId) { + ContainerId containerId = registeredAttempts.get(taskAttemptId); + if (containerId != null) { + containerHeartbeatHandler.pinged(containerId); + } else { + LOG.warn("Handling communication from attempt: " + taskAttemptId + + ", ContainerId not known for this attempt"); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java new file mode 100644 index 0000000..8d060a2 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java @@ -0,0 +1,46 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.app; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; +import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.dag.api.TaskCommunicator; +import org.apache.tez.dag.app.rm.container.AMContainerTask; +import org.apache.tez.dag.records.TezTaskAttemptID; +/** + * This class listens for changes to the state of a Task. + */ +public interface TaskCommunicatorManagerInterface { + + void registerRunningContainer(ContainerId containerId, int taskCommId); + + void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId); + + void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics); + + void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId, TaskAttemptEndReason endReason, String diagnostics); + + void dagComplete(DAG dag); + + void dagSubmitted(); + + TaskCommunicator getTaskCommunicator(int taskCommIndex); +} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 6b474ff..756ed28 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -82,7 +82,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.app.AppContext; -import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.DAGReport; @@ -160,7 +160,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private final Lock readLock; private final Lock writeLock; private final String dagName; - private final TaskAttemptListener taskAttemptListener; + private final TaskCommunicatorManagerInterface taskCommunicatorManagerInterface; private final TaskHeartbeatHandler taskHeartbeatHandler; private final Object tasksSyncHandle = new Object(); @@ -489,7 +489,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, Configuration amConf, DAGPlan jobPlan, EventHandler eventHandler, - TaskAttemptListener taskAttemptListener, + TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Credentials dagCredentials, Clock clock, String appUserName, @@ -511,7 +511,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, this.clock = clock; this.appContext = appContext; - this.taskAttemptListener = taskAttemptListener; + this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface; this.taskHeartbeatHandler = thh; this.eventHandler = eventHandler; ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); @@ -1538,7 +1538,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, VertexImpl v = new VertexImpl( vertexId, vertexPlan, vertexName, dag.dagConf, - dag.eventHandler, dag.taskAttemptListener, + dag.eventHandler, dag.taskCommunicatorManagerInterface, dag.clock, dag.taskHeartbeatHandler, !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint, dag.vertexGroups, dag.taskSpecificLaunchCmdOption, dag.entityUpdateTracker); http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 6957b1d..3f2e3a4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -61,7 +61,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptReport; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; -import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.TaskAttempt; @@ -403,17 +403,17 @@ public class TaskAttemptImpl implements TaskAttempt, @SuppressWarnings("rawtypes") public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler, - TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock, + TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Configuration conf, Clock clock, TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, boolean isRescheduled, Resource resource, ContainerContext containerContext, boolean leafVertex, Task task) { - this(taskId, attemptNumber, eventHandler, taskAttemptListener, conf, clock, + this(taskId, attemptNumber, eventHandler, taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext, isRescheduled, resource, containerContext, leafVertex, task, null); } public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler, - TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock, + TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Configuration conf, Clock clock, TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, boolean isRescheduled, Resource resource, ContainerContext containerContext, boolean leafVertex, http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 1b55295..ea14483 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -56,7 +56,7 @@ import org.apache.tez.dag.api.oldrecords.TaskReport; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; -import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.app.dag.Task; @@ -115,7 +115,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { .getProperty("line.separator"); protected final Configuration conf; - protected final TaskAttemptListener taskAttemptListener; + protected final TaskCommunicatorManagerInterface taskCommunicatorManagerInterface; protected final TaskHeartbeatHandler taskHeartbeatHandler; protected final EventHandler eventHandler; private final TezTaskID taskId; @@ -341,7 +341,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { public TaskImpl(TezVertexID vertexId, int taskIndex, EventHandler eventHandler, Configuration conf, - TaskAttemptListener taskAttemptListener, + TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Clock clock, TaskHeartbeatHandler thh, AppContext appContext, boolean leafVertex, Resource resource, ContainerContext containerContext, @@ -357,7 +357,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { maxFailedAttempts = this.conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT); taskId = TezTaskID.getInstance(vertexId, taskIndex); - this.taskAttemptListener = taskAttemptListener; + this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface; this.taskHeartbeatHandler = thh; this.eventHandler = eventHandler; this.appContext = appContext; @@ -817,7 +817,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) { return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler, - taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext, + taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext, (failedAttempts > 0), taskResource, containerContext, leafVertex, this, schedulingCausalTA); } http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 3cc439f..a1dcf6c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -94,7 +94,7 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.TaskAttemptEventInfo; -import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.RootInputInitializerManager; @@ -212,7 +212,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl private final Lock readLock; private final Lock writeLock; - private final TaskAttemptListener taskAttemptListener; + private final TaskCommunicatorManagerInterface taskCommunicatorManagerInterface; private final TaskHeartbeatHandler taskHeartbeatHandler; private final Object tasksSyncHandle = new Object(); @@ -890,7 +890,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan, String vertexName, Configuration dagConf, EventHandler eventHandler, - TaskAttemptListener taskAttemptListener, Clock clock, + TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Clock clock, TaskHeartbeatHandler thh, boolean commitVertexOutputs, AppContext appContext, VertexLocationHint vertexLocationHint, Map<String, VertexGroupInfo> dagVertexGroups, TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption, @@ -911,7 +911,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl this.appContext = appContext; this.commitVertexOutputs = commitVertexOutputs; - this.taskAttemptListener = taskAttemptListener; + this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface; this.taskHeartbeatHandler = thh; this.eventHandler = eventHandler; ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); @@ -2331,7 +2331,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl return new TaskImpl(this.getVertexId(), taskIndex, this.eventHandler, vertexConf, - this.taskAttemptListener, + this.taskCommunicatorManagerInterface, this.clock, this.taskHeartbeatHandler, this.appContext,
