TEZ-2117. Add a manager for ContainerLaunchers running in the AM. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d5e9bafa Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d5e9bafa Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d5e9bafa Branch: refs/heads/TEZ-2003 Commit: d5e9bafaf9f394e662ecd70a075525bbbe163de1 Parents: e671f1b Author: Siddharth Seth <[email protected]> Authored: Wed Feb 18 14:45:34 2015 -0800 Committer: Siddharth Seth <[email protected]> Committed: Thu Apr 9 13:32:31 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../org/apache/tez/dag/app/DAGAppMaster.java | 54 ++-------- .../tez/dag/app/launcher/ContainerLauncher.java | 2 +- .../app/launcher/ContainerLauncherRouter.java | 108 +++++++++++++++++++ .../apache/tez/dag/app/MockDAGAppMaster.java | 5 +- 5 files changed, 124 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d5e9bafa/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 975ce65..1cd74a4 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -2,5 +2,6 @@ ALL CHANGES: TEZ-2019. Temporarily allow the scheduler and launcher to be specified via configuration. TEZ-2006. Task communication plane needs to be pluggable. TEZ-2090. Add tests for jobs running in external services. + TEZ-2117. Add a manager for ContainerLaunchers running in the AM. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/d5e9bafa/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 038d94d..12f2dd6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -25,8 +25,6 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; @@ -136,9 +134,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventType; import org.apache.tez.dag.app.dag.event.VertexEvent; import org.apache.tez.dag.app.dag.event.VertexEventType; import org.apache.tez.dag.app.dag.impl.DAGImpl; -import org.apache.tez.dag.app.launcher.ContainerLauncher; -import org.apache.tez.dag.app.launcher.ContainerLauncherImpl; -import org.apache.tez.dag.app.launcher.LocalContainerLauncher; +import org.apache.tez.dag.app.launcher.ContainerLauncherRouter; import org.apache.tez.dag.app.rm.AMSchedulerEventType; import org.apache.tez.dag.app.rm.NMCommunicatorEventType; import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler; @@ -221,7 +217,7 @@ public class DAGAppMaster extends AbstractService { private AppContext context; private Configuration amConf; private AsyncDispatcher dispatcher; - private ContainerLauncher containerLauncher; + private ContainerLauncherRouter containerLauncherRouter; private ContainerHeartbeatHandler containerHeartbeatHandler; private TaskHeartbeatHandler taskHeartbeatHandler; private TaskAttemptListener taskAttemptListener; @@ -487,9 +483,9 @@ public class DAGAppMaster extends AbstractService { taskSchedulerEventHandler); addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer); - containerLauncher = createContainerLauncher(context); - addIfService(containerLauncher, true); - dispatcher.register(NMCommunicatorEventType.class, containerLauncher); + this.containerLauncherRouter = createContainerLauncherRouter(conf); + addIfService(containerLauncherRouter, true); + dispatcher.register(NMCommunicatorEventType.class, containerLauncherRouter); historyEventHandler = new HistoryEventHandler(context); addIfService(historyEventHandler, true); @@ -977,38 +973,10 @@ public class DAGAppMaster extends AbstractService { return chh; } - protected ContainerLauncher - createContainerLauncher(final AppContext context) throws UnknownHostException { - if(isLocal){ - LOG.info("Creating LocalContainerLauncher"); - return new LocalContainerLauncher(context, taskAttemptListener, workingDirectory); - } else { - // TODO: Temporary reflection with specific parameters until a clean interface is defined. - String containerLauncherClassName = getConfig().get(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS); - if (containerLauncherClassName == null) { - LOG.info("Creating Default Container Launcher"); - return new ContainerLauncherImpl(context); - } else { - LOG.info("Creating container launcher : " + containerLauncherClassName); - Class<? extends ContainerLauncher> containerLauncherClazz = (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz( - containerLauncherClassName); - try { - Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz - .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class); - ctor.setAccessible(true); - ContainerLauncher instance = ctor.newInstance(context, getConfig(), taskAttemptListener); - return instance; - } catch (NoSuchMethodException e) { - throw new TezUncheckedException(e); - } catch (InvocationTargetException e) { - throw new TezUncheckedException(e); - } catch (InstantiationException e) { - throw new TezUncheckedException(e); - } catch (IllegalAccessException e) { - throw new TezUncheckedException(e); - } - } - } + protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf) throws + UnknownHostException { + return new ContainerLauncherRouter(conf, isLocal, context, taskAttemptListener, workingDirectory); + } public ApplicationId getAppID() { @@ -1031,8 +999,8 @@ public class DAGAppMaster extends AbstractService { return dispatcher; } - public ContainerLauncher getContainerLauncher() { - return containerLauncher; + public ContainerLauncherRouter getContainerLauncherRouter() { + return containerLauncherRouter; } public TaskAttemptListener getTaskAttemptListener() { http://git-wip-us.apache.org/repos/asf/tez/blob/d5e9bafa/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java index 35cdeda..196f02b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java @@ -22,6 +22,6 @@ package org.apache.tez.dag.app.launcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.dag.app.rm.NMCommunicatorEvent; -public interface ContainerLauncher +public interface ContainerLauncher extends EventHandler<NMCommunicatorEvent> { } http://git-wip-us.apache.org/repos/asf/tez/blob/d5e9bafa/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java new file mode 100644 index 0000000..34001ed --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java @@ -0,0 +1,108 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.launcher; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.net.UnknownHostException; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.rm.NMCommunicatorEvent; + +public class ContainerLauncherRouter extends AbstractService + implements EventHandler<NMCommunicatorEvent> { + + static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class); + + private final ContainerLauncher containerLauncher; + + @VisibleForTesting + public ContainerLauncherRouter(ContainerLauncher containerLauncher) { + super(ContainerLauncherRouter.class.getName()); + this.containerLauncher = containerLauncher; + } + + // Accepting conf to setup final parameters, if required. + public ContainerLauncherRouter(Configuration conf, boolean isLocal, AppContext context, + TaskAttemptListener taskAttemptListener, + String workingDirectory) throws UnknownHostException { + super(ContainerLauncherRouter.class.getName()); + + if (isLocal) { + LOG.info("Creating LocalContainerLauncher"); + containerLauncher = + new LocalContainerLauncher(context, taskAttemptListener, workingDirectory); + } else { + // TODO: Temporary reflection with specific parameters until a clean interface is defined. + String containerLauncherClassName = + conf.get(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS); + if (containerLauncherClassName == null) { + LOG.info("Creating Default Container Launcher"); + containerLauncher = new ContainerLauncherImpl(context); + } else { + LOG.info("Creating container launcher : " + containerLauncherClassName); + Class<? extends ContainerLauncher> containerLauncherClazz = + (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz( + containerLauncherClassName); + try { + Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz + .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class); + ctor.setAccessible(true); + containerLauncher = ctor.newInstance(context, conf, taskAttemptListener); + } catch (NoSuchMethodException e) { + throw new TezUncheckedException(e); + } catch (InvocationTargetException e) { + throw new TezUncheckedException(e); + } catch (InstantiationException e) { + throw new TezUncheckedException(e); + } catch (IllegalAccessException e) { + throw new TezUncheckedException(e); + } + } + + } + } + + @Override + public void serviceInit(Configuration conf) { + ((AbstractService)containerLauncher).init(conf); + } + + @Override + public void serviceStart() { + ((AbstractService)containerLauncher).start(); + } + + @Override + public void serviceStop() { + ((AbstractService)containerLauncher).stop(); + } + + + @Override + public void handle(NMCommunicatorEvent event) { + containerLauncher.handle(event); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/d5e9bafa/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index 8101234..4191142 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -56,6 +56,7 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.launcher.ContainerLauncher; +import org.apache.tez.dag.app.launcher.ContainerLauncherRouter; import org.apache.tez.dag.app.rm.NMCommunicatorEvent; import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent; @@ -483,9 +484,9 @@ public class MockDAGAppMaster extends DAGAppMaster { // use mock container launcher for tests @Override - protected ContainerLauncher createContainerLauncher(final AppContext context) + protected ContainerLauncherRouter createContainerLauncherRouter(final Configuration conf) throws UnknownHostException { - return containerLauncher; + return new ContainerLauncherRouter(containerLauncher); } public MockContainerLauncher getContainerLauncher() {
