TEZ-2019. Temporarily allow the scheduler and launcher to be specified via configuration. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/201add5c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/201add5c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/201add5c Branch: refs/heads/TEZ-2003 Commit: 201add5cf2f1754abbc464262fbe71bdf1a33c55 Parents: f06240b Author: Siddharth Seth <[email protected]> Authored: Fri Jan 30 16:02:32 2015 -0800 Committer: Siddharth Seth <[email protected]> Committed: Thu Apr 9 13:32:31 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 4 +++ .../apache/tez/dag/api/TezConfiguration.java | 6 ++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 30 ++++++++++++++++- .../dag/app/rm/TaskSchedulerEventHandler.java | 34 ++++++++++++++++++-- .../org/apache/tez/runtime/task/TezChild.java | 3 +- 5 files changed, 73 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/201add5c/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt new file mode 100644 index 0000000..1822fcb --- /dev/null +++ b/TEZ-2003-CHANGES.txt @@ -0,0 +1,4 @@ +ALL CHANGES: + TEZ-2019. Temporarily allow the scheduler and launcher to be specified via configuration. + +INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/201add5c/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 7a49ee5..3509509 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1136,6 +1136,12 @@ public class TezConfiguration extends Configuration { + "tez-ui.webservice.enable"; public static final boolean TEZ_AM_WEBSERVICE_ENABLE_DEFAULT = true; + @ConfigurationScope(Scope.VERTEX) + public static final String TEZ_AM_CONTAINER_LAUNCHER_CLASS = TEZ_AM_PREFIX + "container-launcher.class"; + @ConfigurationScope(Scope.VERTEX) + public static final String TEZ_AM_TASK_SCHEDULER_CLASS = TEZ_AM_PREFIX + "task-scheduler.class"; + + // TODO only validate property here, value can also be validated if necessary public static void validateProperty(String property, Scope usedScope) { Scope validScope = PropertyScope.get(property); http://git-wip-us.apache.org/repos/asf/tez/blob/201add5c/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 2924b25..b857cbc 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -25,6 +25,8 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; @@ -90,6 +92,7 @@ import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.AsyncDispatcher; import org.apache.tez.common.GcTimeUpdater; +import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezConverterUtils; import org.apache.tez.common.TezUtilsInternal; @@ -977,9 +980,34 @@ public class DAGAppMaster extends AbstractService { protected ContainerLauncher createContainerLauncher(final AppContext context) throws UnknownHostException { if(isLocal){ + LOG.info("Creating LocalContainerLauncher"); return new LocalContainerLauncher(context, taskAttemptListener, workingDirectory); } else { - return new ContainerLauncherImpl(context); + // TODO: Temporary reflection with specific parameters until a clean interface is defined. + String containerLauncherClassName = getConfig().get(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS); + if (containerLauncherClassName == null) { + LOG.info("Creating Default Container Launcher"); + return new ContainerLauncherImpl(context); + } else { + LOG.info("Creating container launcher : " + containerLauncherClassName); + Class<? extends ContainerLauncher> containerLauncherClazz = (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz( + containerLauncherClassName); + try { + Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz + .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class); + ctor.setAccessible(true); + ContainerLauncher instance = ctor.newInstance(context, getConfig(), taskAttemptListener); + return instance; + } catch (NoSuchMethodException e) { + throw new TezUncheckedException(e); + } catch (InvocationTargetException e) { + throw new TezUncheckedException(e); + } catch (InstantiationException e) { + throw new TezUncheckedException(e); + } catch (IllegalAccessException e) { + throw new TezUncheckedException(e); + } + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/201add5c/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index 0985d58..b01d7d2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -18,6 +18,8 @@ package org.apache.tez.dag.app.rm; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; @@ -44,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tez.common.ReflectionUtils; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.TaskLocationHint; @@ -331,12 +334,39 @@ public class TaskSchedulerEventHandler extends AbstractService boolean isLocal = getConfig().getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT); if (isLocal) { + LOG.info("Using TaskScheduler: LocalTaskSchedulerService"); return new LocalTaskSchedulerService(this, this.containerSignatureMatcher, host, port, trackingUrl, appContext); } else { - return new YarnTaskSchedulerService(this, this.containerSignatureMatcher, - host, port, trackingUrl, appContext); + String schedulerClassName = getConfig().get(TezConfiguration.TEZ_AM_TASK_SCHEDULER_CLASS); + if (schedulerClassName == null) { + LOG.info("Using TaskScheduler: YarnTaskSchedulerService"); + return new YarnTaskSchedulerService(this, this.containerSignatureMatcher, + host, port, trackingUrl, appContext); + } else { + LOG.info("Using custom TaskScheduler: " + schedulerClassName); + // TODO Temporary reflection with specific parameters. Remove once there is a clean interface. + Class<? extends TaskSchedulerService> taskSchedulerClazz = + (Class<? extends TaskSchedulerService>) ReflectionUtils.getClazz(schedulerClassName); + try { + Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz + .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class, + Integer.class, String.class, Configuration.class); + ctor.setAccessible(true); + TaskSchedulerService taskSchedulerService = + ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig()); + return taskSchedulerService; + } catch (NoSuchMethodException e) { + throw new TezUncheckedException(e); + } catch (InvocationTargetException e) { + throw new TezUncheckedException(e); + } catch (InstantiationException e) { + throw new TezUncheckedException(e); + } catch (IllegalAccessException e) { + throw new TezUncheckedException(e); + } + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/201add5c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index bfec349..fd55992 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -355,7 +355,8 @@ public class TezChild { } if (!isLocal) { RPC.stopProxy(umbilical); - LogManager.shutdown(); + // TODO Temporary change. Revert. Ideally, move this over to the main method in TezChild if possible. +// LogManager.shutdown(); } } }
