TEZ-2122. Setup pluggable components at AM/Vertex level. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b0568218 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b0568218 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b0568218 Branch: refs/heads/TEZ-2003 Commit: b0568218d45e757fdc2bc2e56396e47fbe9697fc Parents: e025255 Author: Siddharth Seth <[email protected]> Authored: Thu Feb 19 14:59:18 2015 -0800 Committer: Siddharth Seth <[email protected]> Committed: Thu Aug 6 01:24:03 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../apache/tez/dag/api/TezConfiguration.java | 29 +++- .../org/apache/tez/dag/api/TezConstants.java | 3 + .../java/org/apache/tez/dag/app/AppContext.java | 4 + .../org/apache/tez/dag/app/DAGAppMaster.java | 121 +++++++++++++- .../dag/app/TaskAttemptListenerImpTezDag.java | 77 +++++---- .../java/org/apache/tez/dag/app/dag/Vertex.java | 4 + .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 8 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 47 ++++++ .../app/launcher/ContainerLauncherRouter.java | 93 +++++++---- .../app/rm/AMSchedulerEventTALaunchRequest.java | 22 ++- .../dag/app/rm/TaskSchedulerEventHandler.java | 163 +++++++++++-------- .../apache/tez/dag/app/MockDAGAppMaster.java | 5 +- .../app/TestTaskAttemptListenerImplTezDag.java | 10 +- .../tez/dag/app/rm/TestContainerReuse.java | 2 +- .../app/rm/TestTaskSchedulerEventHandler.java | 12 +- .../dag/app/rm/TestTaskSchedulerHelpers.java | 18 +- .../tez/tests/TestExternalTezServices.java | 19 ++- 18 files changed, 458 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/b0568218/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 1cd74a4..4bfe08f 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -3,5 +3,6 @@ ALL CHANGES: TEZ-2006. Task communication plane needs to be pluggable. TEZ-2090. Add tests for jobs running in external services. TEZ-2117. Add a manager for ContainerLaunchers running in the AM. + TEZ-2122. Setup pluggable components at AM/Vertex level. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/b0568218/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 34fdb15..39a4c77 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1215,13 +1215,36 @@ public class TezConfiguration extends Configuration { + "tez-ui.webservice.enable"; public static final boolean TEZ_AM_WEBSERVICE_ENABLE_DEFAULT = true; + /** defaults container-launcher for the specific vertex */ @ConfigurationScope(Scope.VERTEX) - public static final String TEZ_AM_CONTAINER_LAUNCHER_CLASS = TEZ_AM_PREFIX + "container-launcher.class"; + public static final String TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME = TEZ_AM_PREFIX + "vertex.container-launcher.name"; + /** defaults task-scheduler for the specific vertex */ @ConfigurationScope(Scope.VERTEX) - public static final String TEZ_AM_TASK_SCHEDULER_CLASS = TEZ_AM_PREFIX + "task-scheduler.class"; + public static final String TEZ_AM_VERTEX_TASK_SCHEDULER_NAME = TEZ_AM_PREFIX + "vertex.task-scheduler.name"; + /** defaults task-communicator for the specific vertex */ @ConfigurationScope(Scope.VERTEX) - public static final String TEZ_AM_TASK_COMMUNICATOR_CLASS = TEZ_AM_PREFIX + "task-communicator.class"; + public static final String TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME = TEZ_AM_PREFIX + "vertex.task-communicator.name"; + /** Comma separated list of named container-launcher classes running in the AM. + * The format for each entry is NAME:CLASSNAME, except for tez default which is specified as Tez + * e.g. Tez, ExtService:org.apache.ExtLauncherClasss + * */ + @ConfigurationScope(Scope.AM) + public static final String TEZ_AM_CONTAINER_LAUNCHERS = TEZ_AM_PREFIX + "container-launchers"; + + /** Comma separated list of task-schedulers classes running in the AM. + * The format for each entry is NAME:CLASSNAME, except for tez default which is specified as Tez + * e.g. Tez, ExtService:org.apache.ExtSchedulerClasss + */ + @ConfigurationScope(Scope.AM) + public static final String TEZ_AM_TASK_SCHEDULERS = TEZ_AM_PREFIX + "task-schedulers"; + + /** Comma separated list of task-communicators classes running in the AM. + * The format for each entry is NAME:CLASSNAME, except for tez default which is specified as Tez + * e.g. Tez, ExtService:org.apache.ExtTaskCommClass + * */ + @ConfigurationScope(Scope.AM) + public static final String TEZ_AM_TASK_COMMUNICATORS = TEZ_AM_PREFIX + "task-communicators"; // TODO only validate property here, value can also be validated if necessary public static void validateProperty(String property, Scope usedScope) { http://git-wip-us.apache.org/repos/asf/tez/blob/b0568218/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java index bc4208f..3b07c59 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java @@ -102,4 +102,7 @@ public class TezConstants { /// Version-related Environment variables public static final String TEZ_CLIENT_VERSION_ENV = "TEZ_CLIENT_VERSION"; + + public static final String TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT = "Tez"; + public static final String TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT = "TezLocal"; } http://git-wip-us.apache.org/repos/asf/tez/blob/b0568218/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java index bf3e318..1ccb10b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java @@ -114,4 +114,8 @@ public interface AppContext { boolean isAMInCompletionState(); Credentials getAppCredentials(); + + public Integer getTaskCommunicatorIdentifier(String name); + public Integer getTaskScheduerIdentifier(String name); + public Integer getContainerLauncherIdentifier(String name); } http://git-wip-us.apache.org/repos/asf/tez/blob/b0568218/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 824dfbc..431a8b2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -56,6 +56,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; @@ -270,7 +272,12 @@ public class DAGAppMaster extends AbstractService { private ExecutorService rawExecutor; private ListeningExecutorService execService; - + + // TODO May not need to be a bidi map + private final BiMap<String, Integer> taskSchedulers = HashBiMap.create(); + private final BiMap<String, Integer> containerLaunchers = HashBiMap.create(); + private final BiMap<String, Integer> taskCommunicators = HashBiMap.create(); + /** * set of already executed dag names. */ @@ -374,6 +381,29 @@ public class DAGAppMaster extends AbstractService { this.isLocal = conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT); + String tezDefaultClassIdentifier = + isLocal ? TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT : + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT; + + String[] taskSchedulerClassIdentifiers = parsePlugins(taskSchedulers, + conf.getTrimmedStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS, + tezDefaultClassIdentifier), + TezConfiguration.TEZ_AM_TASK_SCHEDULERS); + + String[] containerLauncherClassIdentifiers = parsePlugins(containerLaunchers, + conf.getTrimmedStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS, + tezDefaultClassIdentifier), + TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS); + + String[] taskCommunicatorClassIdentifiers = parsePlugins(taskCommunicators, + conf.getTrimmedStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS, + tezDefaultClassIdentifier), + TezConfiguration.TEZ_AM_TASK_COMMUNICATORS); + + LOG.info(buildPluginComponentLog(taskSchedulerClassIdentifiers, taskSchedulers, "TaskSchedulers")); + LOG.info(buildPluginComponentLog(containerLauncherClassIdentifiers, containerLaunchers, "ContainerLaunchers")); + LOG.info(buildPluginComponentLog(taskCommunicatorClassIdentifiers, taskCommunicators, "TaskCommunicators")); + boolean disableVersionCheck = conf.getBoolean( TezConfiguration.TEZ_AM_DISABLE_CLIENT_VERSION_CHECK, TezConfiguration.TEZ_AM_DISABLE_CLIENT_VERSION_CHECK_DEFAULT); @@ -439,7 +469,7 @@ public class DAGAppMaster extends AbstractService { //service to handle requests to TaskUmbilicalProtocol taskAttemptListener = createTaskAttemptListener(context, - taskHeartbeatHandler, containerHeartbeatHandler); + taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorClassIdentifiers); addIfService(taskAttemptListener, true); containerSignatureMatcher = createContainerSignatureMatcher(); @@ -486,7 +516,8 @@ public class DAGAppMaster extends AbstractService { } this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context, - clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService); + clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService, + taskSchedulerClassIdentifiers); addIfService(taskSchedulerEventHandler, true); if (enableWebUIService()) { @@ -504,7 +535,7 @@ public class DAGAppMaster extends AbstractService { taskSchedulerEventHandler); addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer); - this.containerLauncherRouter = createContainerLauncherRouter(conf); + this.containerLauncherRouter = createContainerLauncherRouter(conf, containerLauncherClassIdentifiers); addIfService(containerLauncherRouter, true); dispatcher.register(NMCommunicatorEventType.class, containerLauncherRouter); @@ -1012,9 +1043,9 @@ public class DAGAppMaster extends AbstractService { } protected TaskAttemptListener createTaskAttemptListener(AppContext context, - TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) { + TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, String[] taskCommunicatorClasses) { TaskAttemptListener lis = - new TaskAttemptListenerImpTezDag(context, thh, chh,jobTokenSecretManager); + new TaskAttemptListenerImpTezDag(context, thh, chh,jobTokenSecretManager, taskCommunicatorClasses); return lis; } @@ -1035,9 +1066,9 @@ public class DAGAppMaster extends AbstractService { return chh; } - protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf) throws + protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf, String []containerLauncherClasses) throws UnknownHostException { - return new ContainerLauncherRouter(conf, isLocal, context, taskAttemptListener, workingDirectory); + return new ContainerLauncherRouter(conf, context, taskAttemptListener, workingDirectory, containerLauncherClasses); } @@ -1499,6 +1530,21 @@ public class DAGAppMaster extends AbstractService { } @Override + public Integer getTaskCommunicatorIdentifier(String name) { + return taskCommunicators.get(name); + } + + @Override + public Integer getTaskScheduerIdentifier(String name) { + return taskSchedulers.get(name); + } + + @Override + public Integer getContainerLauncherIdentifier(String name) { + return taskCommunicators.get(name); + } + + @Override public Map<ApplicationAccessType, String> getApplicationACLs() { if (getServiceState() != STATE.STARTED) { throw new TezUncheckedException( @@ -2291,4 +2337,63 @@ public class DAGAppMaster extends AbstractService { return amConf.getBoolean(TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE, TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE_DEFAULT); } + + // Tez default classnames are populated as TezConfiguration.TEZ_AM_SERVICE_PLUGINS_DEFAULT + private String[] parsePlugins(BiMap<String, Integer> pluginMap, String[] pluginStrings, + String context) { + Preconditions.checkState(pluginStrings != null && pluginStrings.length > 0, + "Plugin strings should not be null or empty: " + context); + + String[] classNames = new String[pluginStrings.length]; + + int index = 0; + for (String pluginString : pluginStrings) { + + String className; + String identifierString; + + Preconditions.checkState(pluginString != null && !pluginString.isEmpty(), + "Plugin string: " + pluginString + " should not be null or empty"); + if (pluginString.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT) || + pluginString.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) { + // Kind of ugly, but Tez internal routing is encoded via a String instead of classnames. + // Individual components - TaskComm, Scheduler, Launcher deal with actual classname translation, + // and avoid reflection. + identifierString = pluginString; + className = pluginString; + } else { + String[] parts = pluginString.split(":"); + Preconditions.checkState( + parts.length == 2 && parts[0] != null && !parts[0].isEmpty() && parts[1] != null && + !parts[1].isEmpty(), + "Invalid configuration string for " + context + ": " + pluginString); + Preconditions.checkState( + !parts[0].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT) && + !parts[0].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT), + "Identifier cannot be " + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT + " or " + + TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT + " for " + + pluginString); + identifierString = parts[0]; + className = parts[1]; + } + pluginMap.put(identifierString, index); + classNames[index] = className; + } + return classNames; + } + + String buildPluginComponentLog(String[] classIdentifiers, BiMap<String, Integer> map, + String component) { + StringBuilder sb = new StringBuilder(); + sb.append("AM Level configured ").append(component).append(": "); + for (int i = 0; i < classIdentifiers.length; i++) { + sb.append("[").append(i).append(":").append(map.inverse().get(i)).append(":") + .append(taskSchedulers.inverse().get(i)).append( + "]"); + if (i != classIdentifiers.length - 1) { + sb.append(","); + } + } + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/b0568218/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index ff50907..3d9abdf 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -37,7 +37,6 @@ import org.apache.tez.runtime.api.impl.EventType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -46,7 +45,7 @@ import org.apache.tez.common.ReflectionUtils; import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.api.TaskHeartbeatResponse; -import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -56,7 +55,6 @@ import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; -import org.apache.tez.dag.app.rm.TaskSchedulerService; import org.apache.tez.dag.app.rm.container.AMContainerTask; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezVertexID; @@ -73,7 +71,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements .getLogger(TaskAttemptListenerImpTezDag.class); private final AppContext context; - private TaskCommunicator taskCommunicator; + private final TaskCommunicator[] taskCommunicators; protected final TaskHeartbeatHandler taskHeartbeatHandler; protected final ContainerHeartbeatHandler containerHeartbeatHandler; @@ -99,28 +97,52 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements public TaskAttemptListenerImpTezDag(AppContext context, TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, // TODO TEZ-2003 pre-merge. Remove reference to JobTokenSecretManager. - JobTokenSecretManager jobTokenSecretManager) { + JobTokenSecretManager jobTokenSecretManager, + String [] taskCommunicatorClassIdentifiers) { super(TaskAttemptListenerImpTezDag.class.getName()); this.context = context; this.taskHeartbeatHandler = thh; this.containerHeartbeatHandler = chh; - this.taskCommunicator = new TezTaskCommunicatorImpl(this); + if (taskCommunicatorClassIdentifiers == null || taskCommunicatorClassIdentifiers.length == 0) { + taskCommunicatorClassIdentifiers = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT}; + } + this.taskCommunicators = new TaskCommunicator[taskCommunicatorClassIdentifiers.length]; + for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) { + taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i]); + } + // TODO TEZ-2118 Start using taskCommunicator indices properly + } + + @Override + public void serviceStart() { + // TODO Why is init tied to serviceStart + for (int i = 0 ; i < taskCommunicators.length ; i++) { + taskCommunicators[i].init(getConfig()); + taskCommunicators[i].start(); + } } @Override - public void serviceInit(Configuration conf) { - String taskCommClassName = conf.get(TezConfiguration.TEZ_AM_TASK_COMMUNICATOR_CLASS); - if (taskCommClassName == null) { + public void serviceStop() { + for (int i = 0 ; i < taskCommunicators.length ; i++) { + taskCommunicators[i].stop(); + } + } + + private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier) { + if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT) || + taskCommClassIdentifier + .equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) { LOG.info("Using Default Task Communicator"); - this.taskCommunicator = new TezTaskCommunicatorImpl(this); + return new TezTaskCommunicatorImpl(this); } else { - LOG.info("Using TaskCommunicator: " + taskCommClassName); + LOG.info("Using TaskCommunicator: " + taskCommClassIdentifier); Class<? extends TaskCommunicator> taskCommClazz = (Class<? extends TaskCommunicator>) ReflectionUtils - .getClazz(taskCommClassName); + .getClazz(taskCommClassIdentifier); try { Constructor<? extends TaskCommunicator> ctor = taskCommClazz.getConstructor(TaskCommunicatorContext.class); ctor.setAccessible(true); - this.taskCommunicator = ctor.newInstance(this); + return ctor.newInstance(this); } catch (NoSuchMethodException e) { throw new TezUncheckedException(e); } catch (InvocationTargetException e) { @@ -134,20 +156,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } @Override - public void serviceStart() { - taskCommunicator.init(getConfig()); - taskCommunicator.start(); - } - - @Override - public void serviceStop() { - if (taskCommunicator != null) { - taskCommunicator.stop(); - taskCommunicator = null; - } - } - - @Override public ApplicationAttemptId getApplicationAttemptId() { return context.getApplicationAttemptId(); } @@ -235,7 +243,8 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements @Override public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) { - context.getEventHandler().handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null)); + context.getEventHandler() + .handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null)); pingContainerHeartbeatHandler(containerId); } @@ -265,7 +274,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements @Override public InetSocketAddress getAddress() { - return taskCommunicator.getAddress(); + return taskCommunicators[0].getAddress(); } // The TaskAttemptListener register / unregister methods in this class are not thread safe. @@ -297,7 +306,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements "Multiple registrations for containerId: " + containerId); } NodeId nodeId = context.getAllContainers().get(containerId).getContainer().getNodeId(); - taskCommunicator.registerRunningContainer(containerId, nodeId.getHost(), nodeId.getPort()); + taskCommunicators[0].registerRunningContainer(containerId, nodeId.getHost(), nodeId.getPort()); } @Override @@ -309,7 +318,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements if (containerInfo.taskAttemptId != null) { registeredAttempts.remove(containerInfo.taskAttemptId); } - taskCommunicator.registerContainerEnd(containerId); + taskCommunicators[0].registerContainerEnd(containerId); } @Override @@ -344,7 +353,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId + " when already assigned to: " + containerIdFromMap); } - taskCommunicator.registerRunningTaskAttempt(containerId, amContainerTask.getTask(), + taskCommunicators[0].registerRunningTaskAttempt(containerId, amContainerTask.getTask(), amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(), amContainerTask.haveCredentialsChanged()); } @@ -364,7 +373,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map. registeredContainers.put(containerId, NULL_CONTAINER_INFO); - taskCommunicator.unregisterRunningTaskAttempt(attemptId); + taskCommunicators[0].unregisterRunningTaskAttempt(attemptId); } private void pingContainerHeartbeatHandler(ContainerId containerId) { @@ -383,6 +392,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements public TaskCommunicator getTaskCommunicator() { - return taskCommunicator; + return taskCommunicators[0]; } } http://git-wip-us.apache.org/repos/asf/tez/blob/b0568218/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index ab7941e..552da11 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -173,4 +173,8 @@ public interface Vertex extends Comparable<Vertex> { public int getKilledTaskAttemptCount(); public Configuration getConf(); + + public int getTaskSchedulerIdentifier(); + public int getContainerLauncherIdentifier(); + public int getTaskCommunicatorIdentifier(); } http://git-wip-us.apache.org/repos/asf/tez/blob/b0568218/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index ebf7c58..806b977 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -1065,9 +1065,15 @@ public class TaskAttemptImpl implements TaskAttempt, priority = (scheduleEvent.getPriorityHighLimit() + scheduleEvent.getPriorityLowLimit()) / 2; } + // TODO Jira post TEZ-2003 getVertex implementation is very inefficient. This should be via references, instead of locked table lookups. + Vertex vertex = ta.getVertex(); AMSchedulerEventTALaunchRequest launchRequestEvent = new AMSchedulerEventTALaunchRequest( ta.attemptId, ta.taskResource, remoteTaskSpec, ta, locationHint, - priority, ta.containerContext); + priority, ta.containerContext, + vertex.getTaskSchedulerIdentifier(), + vertex.getContainerLauncherIdentifier(), + vertex.getTaskCommunicatorIdentifier()); + ta.sendEvent(launchRequestEvent); return TaskAttemptStateInternal.START_WAIT; } http://git-wip-us.apache.org/repos/asf/tez/blob/b0568218/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 1fcfe7e..0583a0b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -74,6 +74,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.RootInputLeafOutput; import org.apache.tez.dag.api.Scope; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.TaskLocationHint; @@ -232,6 +233,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl private final boolean isSpeculationEnabled; + private final int taskSchedulerIdentifier; + private final int containerLauncherIdentifier; + private final int taskCommunicatorIdentifier; + //fields initialized in init @VisibleForTesting @@ -986,6 +991,33 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl // This "this leak" is okay because the retained pointer is in an // instance variable. + boolean isLocal = vertexConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, + TezConfiguration.TEZ_LOCAL_MODE_DEFAULT); + + String tezDefaultComponentName = + isLocal ? TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT : + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT; + String taskSchedulerName = + vertexConf.get(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, tezDefaultComponentName); + String taskCommName = vertexConf + .get(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, tezDefaultComponentName); + String containerLauncherName = vertexConf + .get(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, tezDefaultComponentName); + taskSchedulerIdentifier = appContext.getTaskScheduerIdentifier(taskSchedulerName); + taskCommunicatorIdentifier = appContext.getTaskCommunicatorIdentifier(taskCommName); + containerLauncherIdentifier = appContext.getContainerLauncherIdentifier(containerLauncherName); + + Preconditions.checkNotNull(taskSchedulerIdentifier, "Unknown taskScheduler: " + taskSchedulerName); + Preconditions.checkNotNull(taskCommunicatorIdentifier, "Unknown taskCommunicator: " + containerLauncherName); + Preconditions.checkNotNull(containerLauncherIdentifier, "Unknown containerLauncher: " + taskCommName); + + StringBuilder sb = new StringBuilder(); + sb.append("Running vertex: ").append(logIdentifier).append(" : ") + .append("TaskScheduler=").append(taskSchedulerIdentifier).append(":").append(taskSchedulerName) + .append(", ContainerLauncher=").append(containerLauncherIdentifier).append(":").append(containerLauncherName) + .append(", TaskCommunicator=").append(taskCommunicatorIdentifier).append(":").append(taskCommName); + LOG.info(sb.toString()); + stateMachine = new StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl>( stateMachineFactory.make(this), this); augmentStateMachine(); @@ -996,6 +1028,21 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl return vertexConf; } + @Override + public int getTaskSchedulerIdentifier() { + return this.taskSchedulerIdentifier; + } + + @Override + public int getContainerLauncherIdentifier() { + return this.containerLauncherIdentifier; + } + + @Override + public int getTaskCommunicatorIdentifier() { + return this.taskCommunicatorIdentifier; + } + private boolean isSpeculationEnabled() { return isSpeculationEnabled; } http://git-wip-us.apache.org/repos/asf/tez/blob/b0568218/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java index 34001ed..621e4a8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java @@ -26,6 +26,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.TaskAttemptListener; @@ -36,73 +37,93 @@ public class ContainerLauncherRouter extends AbstractService static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class); - private final ContainerLauncher containerLauncher; + private final ContainerLauncher containerLaunchers[]; @VisibleForTesting public ContainerLauncherRouter(ContainerLauncher containerLauncher) { super(ContainerLauncherRouter.class.getName()); - this.containerLauncher = containerLauncher; + containerLaunchers = new ContainerLauncher[] {containerLauncher}; } // Accepting conf to setup final parameters, if required. - public ContainerLauncherRouter(Configuration conf, boolean isLocal, AppContext context, + public ContainerLauncherRouter(Configuration conf, AppContext context, TaskAttemptListener taskAttemptListener, - String workingDirectory) throws UnknownHostException { + String workingDirectory, + String[] containerLauncherClassIdentifiers) throws UnknownHostException { super(ContainerLauncherRouter.class.getName()); - if (isLocal) { + if (containerLauncherClassIdentifiers == null || containerLauncherClassIdentifiers.length == 0) { + containerLauncherClassIdentifiers = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT}; + } + containerLaunchers = new ContainerLauncher[containerLauncherClassIdentifiers.length]; + + for (int i = 0; i < containerLauncherClassIdentifiers.length; i++) { + containerLaunchers[i] = createContainerLauncher(containerLauncherClassIdentifiers[i], context, + taskAttemptListener, workingDirectory, conf); + } + } + + private ContainerLauncher createContainerLauncher(String containerLauncherClassIdentifier, + AppContext context, + TaskAttemptListener taskAttemptListener, + String workingDirectory, + Configuration conf) throws + UnknownHostException { + if (containerLauncherClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { + LOG.info("Creating DefaultContainerLauncher"); + return new ContainerLauncherImpl(context); + } else if (containerLauncherClassIdentifier + .equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) { LOG.info("Creating LocalContainerLauncher"); - containerLauncher = + return new LocalContainerLauncher(context, taskAttemptListener, workingDirectory); } else { - // TODO: Temporary reflection with specific parameters until a clean interface is defined. - String containerLauncherClassName = - conf.get(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS); - if (containerLauncherClassName == null) { - LOG.info("Creating Default Container Launcher"); - containerLauncher = new ContainerLauncherImpl(context); - } else { - LOG.info("Creating container launcher : " + containerLauncherClassName); - Class<? extends ContainerLauncher> containerLauncherClazz = - (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz( - containerLauncherClassName); - try { - Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz - .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class); - ctor.setAccessible(true); - containerLauncher = ctor.newInstance(context, conf, taskAttemptListener); - } catch (NoSuchMethodException e) { - throw new TezUncheckedException(e); - } catch (InvocationTargetException e) { - throw new TezUncheckedException(e); - } catch (InstantiationException e) { - throw new TezUncheckedException(e); - } catch (IllegalAccessException e) { - throw new TezUncheckedException(e); - } + LOG.info("Creating container launcher : " + containerLauncherClassIdentifier); + Class<? extends ContainerLauncher> containerLauncherClazz = + (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz( + containerLauncherClassIdentifier); + try { + Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz + .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class); + ctor.setAccessible(true); + return ctor.newInstance(context, conf, taskAttemptListener); + } catch (NoSuchMethodException e) { + throw new TezUncheckedException(e); + } catch (InvocationTargetException e) { + throw new TezUncheckedException(e); + } catch (InstantiationException e) { + throw new TezUncheckedException(e); + } catch (IllegalAccessException e) { + throw new TezUncheckedException(e); } - } + // TODO TEZ-2118 Handle routing to multiple launchers } @Override public void serviceInit(Configuration conf) { - ((AbstractService)containerLauncher).init(conf); + for (int i = 0 ; i < containerLaunchers.length ; i++) { + ((AbstractService) containerLaunchers[i]).init(conf); + } } @Override public void serviceStart() { - ((AbstractService)containerLauncher).start(); + for (int i = 0 ; i < containerLaunchers.length ; i++) { + ((AbstractService) containerLaunchers[i]).start(); + } } @Override public void serviceStop() { - ((AbstractService)containerLauncher).stop(); + for (int i = 0 ; i < containerLaunchers.length ; i++) { + ((AbstractService) containerLaunchers[i]).stop(); + } } @Override public void handle(NMCommunicatorEvent event) { - containerLauncher.handle(event); + containerLaunchers[0].handle(event); } } http://git-wip-us.apache.org/repos/asf/tez/blob/b0568218/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java index 5c4d43c..c59193c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java @@ -38,11 +38,16 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent { private final TaskSpec remoteTaskSpec; private final TaskAttempt taskAttempt; + private final int schedulerId; + private final int launcherId; + private final int taskCommId; + public AMSchedulerEventTALaunchRequest(TezTaskAttemptID attemptId, Resource capability, TaskSpec remoteTaskSpec, TaskAttempt ta, TaskLocationHint locationHint, int priority, - ContainerContext containerContext) { + ContainerContext containerContext, + int schedulerId, int launcherId, int taskCommId) { super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST); this.attemptId = attemptId; this.capability = capability; @@ -51,6 +56,9 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent { this.locationHint = locationHint; this.priority = priority; this.containerContext = containerContext; + this.schedulerId = schedulerId; + this.launcherId = launcherId; + this.taskCommId = taskCommId; } public TezTaskAttemptID getAttemptID() { @@ -81,6 +89,18 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent { return this.containerContext; } + public int getSchedulerId() { + return schedulerId; + } + + public int getLauncherId() { + return launcherId; + } + + public int getTaskCommId() { + return taskCommId; + } + // Parameter replacement: @taskid@ will not be usable // ProfileTaskRange not available along with ContainerReUse http://git-wip-us.apache.org/repos/asf/tez/blob/b0568218/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index a77b53a..cb109ae 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.TaskLocationHint.TaskBasedLocationAffinity; @@ -92,7 +93,6 @@ public class TaskSchedulerEventHandler extends AbstractService @SuppressWarnings("rawtypes") private final EventHandler eventHandler; private final String historyUrl; - protected TaskSchedulerService taskScheduler; private DAGAppMaster dagAppMaster; private Map<ApplicationAccessType, String> appAcls = null; private Thread eventHandlingThread; @@ -105,14 +105,27 @@ public class TaskSchedulerEventHandler extends AbstractService private AtomicBoolean shouldUnregisterFlag = new AtomicBoolean(false); private final WebUIService webUI; + private final String[] taskSchedulerClasses; + protected final TaskSchedulerService []taskSchedulers; BlockingQueue<AMSchedulerEvent> eventQueue = new LinkedBlockingQueue<AMSchedulerEvent>(); + /** + * + * @param appContext + * @param clientService + * @param eventHandler + * @param containerSignatureMatcher + * @param webUI + * @param schedulerClasses the list of scheduler classes / codes. Tez internal classes are represented as codes. + * An empty list defaults to using the YarnTaskScheduler as the only source. + */ @SuppressWarnings("rawtypes") public TaskSchedulerEventHandler(AppContext appContext, DAGClientServer clientService, EventHandler eventHandler, - ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) { + ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI, + String [] schedulerClasses) { super(TaskSchedulerEventHandler.class.getName()); this.appContext = appContext; this.eventHandler = eventHandler; @@ -123,6 +136,12 @@ public class TaskSchedulerEventHandler extends AbstractService if (this.webUI != null) { this.webUI.setHistoryUrl(this.historyUrl); } + if (schedulerClasses == null || schedulerClasses.length == 0) { + this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT}; + } else { + this.taskSchedulerClasses = schedulerClasses; + } + taskSchedulers = new TaskSchedulerService[this.taskSchedulerClasses.length]; } public Map<ApplicationAccessType, String> getApplicationAcls() { @@ -139,11 +158,11 @@ public class TaskSchedulerEventHandler extends AbstractService } public Resource getAvailableResources() { - return taskScheduler.getAvailableResources(); + return taskSchedulers[0].getAvailableResources(); } public Resource getTotalResources() { - return taskScheduler.getTotalResources(); + return taskSchedulers[0].getTotalResources(); } public synchronized void handleEvent(AMSchedulerEvent sEvent) { @@ -209,9 +228,9 @@ public class TaskSchedulerEventHandler extends AbstractService private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate event) { if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) { - taskScheduler.blacklistNode(event.getNodeId()); + taskSchedulers[0].blacklistNode(event.getNodeId()); } else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) { - taskScheduler.unblacklistNode(event.getNodeId()); + taskSchedulers[0].unblacklistNode(event.getNodeId()); } else { throw new TezUncheckedException("Invalid event type: " + event.getType()); } @@ -223,14 +242,14 @@ public class TaskSchedulerEventHandler extends AbstractService // TODO what happens to the task that was connected to this container? // current assumption is that it will eventually call handleTaStopRequest //TaskAttempt taskAttempt = (TaskAttempt) - taskScheduler.deallocateContainer(containerId); + taskSchedulers[0].deallocateContainer(containerId); // TODO does this container need to be stopped via C_STOP_REQUEST sendEvent(new AMContainerEventStopRequest(containerId)); } private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) { TaskAttempt attempt = event.getAttempt(); - boolean wasContainerAllocated = taskScheduler.deallocateTask(attempt, false); + boolean wasContainerAllocated = taskSchedulers[0].deallocateTask(attempt, false); // use stored value of container id in case the scheduler has removed this // assignment because the task has been deallocated earlier. // retroactive case @@ -272,7 +291,7 @@ public class TaskSchedulerEventHandler extends AbstractService event.getAttemptID())); } - boolean wasContainerAllocated = taskScheduler.deallocateTask(attempt, true); + boolean wasContainerAllocated = taskSchedulers[0].deallocateTask(attempt, true); if (!wasContainerAllocated) { LOG.error("De-allocated successful task: " + attempt.getID() + ", but TaskScheduler reported no container assigned to task"); @@ -297,7 +316,7 @@ public class TaskSchedulerEventHandler extends AbstractService TaskAttempt affinityAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt(); if (affinityAttempt != null) { Preconditions.checkNotNull(affinityAttempt.getAssignedContainerID(), affinityAttempt.getID()); - taskScheduler.allocateTask(taskAttempt, + taskSchedulers[0].allocateTask(taskAttempt, event.getCapability(), affinityAttempt.getAssignedContainerID(), Priority.newInstance(event.getPriority()), @@ -316,57 +335,59 @@ public class TaskSchedulerEventHandler extends AbstractService .toArray(new String[locationHint.getRacks().size()]) : null; } } - - taskScheduler.allocateTask(taskAttempt, - event.getCapability(), - hosts, - racks, - Priority.newInstance(event.getPriority()), - event.getContainerContext(), - event); - } - - - protected TaskSchedulerService createTaskScheduler(String host, int port, - String trackingUrl, AppContext appContext) { - boolean isLocal = getConfig().getBoolean(TezConfiguration.TEZ_LOCAL_MODE, - TezConfiguration.TEZ_LOCAL_MODE_DEFAULT); - if (isLocal) { - LOG.info("Using TaskScheduler: LocalTaskSchedulerService"); + + taskSchedulers[0].allocateTask(taskAttempt, + event.getCapability(), + hosts, + racks, + Priority.newInstance(event.getPriority()), + event.getContainerContext(), + event); + } + + private TaskSchedulerService createTaskScheduler(String host, int port, String trackingUrl, + AppContext appContext, + String schedulerClassName) { + if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { + LOG.info("Creating TaskScheduler: YarnTaskSchedulerService"); + return new YarnTaskSchedulerService(this, this.containerSignatureMatcher, + host, port, trackingUrl, appContext); + } else if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) { + LOG.info("Creating TaskScheduler: Local TaskScheduler"); return new LocalTaskSchedulerService(this, this.containerSignatureMatcher, host, port, trackingUrl, appContext); - } - else { - String schedulerClassName = getConfig().get(TezConfiguration.TEZ_AM_TASK_SCHEDULER_CLASS); - if (schedulerClassName == null) { - LOG.info("Using TaskScheduler: YarnTaskSchedulerService"); - return new YarnTaskSchedulerService(this, this.containerSignatureMatcher, - host, port, trackingUrl, appContext); - } else { - LOG.info("Using custom TaskScheduler: " + schedulerClassName); - // TODO Temporary reflection with specific parameters. Remove once there is a clean interface. - Class<? extends TaskSchedulerService> taskSchedulerClazz = - (Class<? extends TaskSchedulerService>) ReflectionUtils.getClazz(schedulerClassName); - try { - Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz - .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class, - int.class, String.class, Configuration.class); - ctor.setAccessible(true); - TaskSchedulerService taskSchedulerService = - ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig()); - return taskSchedulerService; - } catch (NoSuchMethodException e) { - throw new TezUncheckedException(e); - } catch (InvocationTargetException e) { - throw new TezUncheckedException(e); - } catch (InstantiationException e) { - throw new TezUncheckedException(e); - } catch (IllegalAccessException e) { - throw new TezUncheckedException(e); - } + } else { + LOG.info("Creating custom TaskScheduler: " + schedulerClassName); + // TODO TEZ-2003 Temporary reflection with specific parameters. Remove once there is a clean interface. + Class<? extends TaskSchedulerService> taskSchedulerClazz = + (Class<? extends TaskSchedulerService>) ReflectionUtils.getClazz(schedulerClassName); + try { + Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz + .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class, + int.class, String.class, Configuration.class); + ctor.setAccessible(true); + return ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig()); + } catch (NoSuchMethodException e) { + throw new TezUncheckedException(e); + } catch (InvocationTargetException e) { + throw new TezUncheckedException(e); + } catch (InstantiationException e) { + throw new TezUncheckedException(e); + } catch (IllegalAccessException e) { + throw new TezUncheckedException(e); } } } + + @VisibleForTesting + protected void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) { + // Iterate over the list and create all the taskSchedulers + for (int i = 0; i < taskSchedulerClasses.length; i++) { + taskSchedulers[i] = createTaskScheduler(host, port, + trackingUrl, appContext, taskSchedulerClasses[i]); + } + } + @Override public synchronized void serviceStart() { @@ -377,13 +398,17 @@ public class TaskSchedulerEventHandler extends AbstractService // always try to connect to AM and proxy the response. hence it wont work if the webUIService // is not enabled. String trackingUrl = (webUI != null) ? webUI.getTrackingURL() : ""; - taskScheduler = createTaskScheduler(serviceAddr.getHostName(), - serviceAddr.getPort(), trackingUrl, appContext); - taskScheduler.init(getConfig()); - taskScheduler.start(); + instantiateScheduelrs(serviceAddr.getHostName(), serviceAddr.getPort(), trackingUrl, appContext); + + for (int i = 0 ; i < taskSchedulers.length ; i++) { + taskSchedulers[i].init(getConfig()); + taskSchedulers[i].start(); + } + + // TODO TEZ-2118 Start using multiple task schedulers if (shouldUnregisterFlag.get()) { // Flag may have been set earlier when task scheduler was not initialized - taskScheduler.setShouldUnregister(); + taskSchedulers[0].setShouldUnregister(); } this.eventHandlingThread = new Thread("TaskSchedulerEventHandlerThread") { @@ -432,8 +457,8 @@ public class TaskSchedulerEventHandler extends AbstractService if (eventHandlingThread != null) eventHandlingThread.interrupt(); } - if (taskScheduler != null) { - ((AbstractService)taskScheduler).stop(); + if (taskSchedulers[0] != null) { + ((AbstractService)taskSchedulers[0]).stop(); } } @@ -578,7 +603,7 @@ public class TaskSchedulerEventHandler extends AbstractService public float getProgress() { // at this point allocate has been called and so node count must be available // may change after YARN-1722 - int nodeCount = taskScheduler.getClusterNodeCount(); + int nodeCount = taskSchedulers[0].getClusterNodeCount(); if (nodeCount != cachedNodeCount) { cachedNodeCount = nodeCount; sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount)); @@ -593,7 +618,7 @@ public class TaskSchedulerEventHandler extends AbstractService } public void dagCompleted() { - taskScheduler.dagComplete(); + taskSchedulers[0].dagComplete(); } public void dagSubmitted() { @@ -603,7 +628,7 @@ public class TaskSchedulerEventHandler extends AbstractService @Override public void preemptContainer(ContainerId containerId) { - taskScheduler.deallocateContainer(containerId); + taskSchedulers[0].deallocateContainer(containerId); // Inform the Containers about completion. sendEvent(new AMContainerEventCompleted(containerId, ContainerExitStatus.INVALID, "Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION)); @@ -612,13 +637,13 @@ public class TaskSchedulerEventHandler extends AbstractService public void setShouldUnregisterFlag() { LOG.info("TaskScheduler notified that it should unregister from RM"); this.shouldUnregisterFlag.set(true); - if (this.taskScheduler != null) { - this.taskScheduler.setShouldUnregister(); + if (this.taskSchedulers[0] != null) { + this.taskSchedulers[0].setShouldUnregister(); } } public boolean hasUnregistered() { - return this.taskScheduler.hasUnregistered(); + return this.taskSchedulers[0].hasUnregistered(); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/tez/blob/b0568218/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index 999fe1c..8763a0c 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -512,10 +512,11 @@ public class MockDAGAppMaster extends DAGAppMaster { this.handlerConcurrency = handlerConcurrency; this.numConcurrentContainers = numConcurrentContainers; } - + // use mock container launcher for tests @Override - protected ContainerLauncherRouter createContainerLauncherRouter(final Configuration conf) + protected ContainerLauncherRouter createContainerLauncherRouter(final Configuration conf, + String[] containerLaunchers) throws UnknownHostException { return new ContainerLauncherRouter(containerLauncher); } http://git-wip-us.apache.org/repos/asf/tez/blob/b0568218/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java index c454c7c..219217f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Random; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -123,12 +124,12 @@ public class TestTaskAttemptListenerImplTezDag { doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class)); doReturn(container).when(amContainer).getContainer(); - taskAttemptListener = - new TaskAttemptListenerImpTezDag(appContext, mock(TaskHeartbeatHandler.class), - mock(ContainerHeartbeatHandler.class), null); + taskAttemptListener = new TaskAttemptListenerImplForTest(appContext, + mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null); TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(); TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical(); + taskSpec = mock(TaskSpec.class); doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID(); amContainerTask = new AMContainerTask(taskSpec, null, null, false, 0); @@ -138,6 +139,9 @@ public class TestTaskAttemptListenerImplTezDag { @Test(timeout = 5000) public void testGetTask() throws IOException { + TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(); + TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical(); + ContainerId containerId1 = createContainerId(appId, 1); ContainerContext containerContext1 = new ContainerContext(containerId1.toString()); containerTask = tezUmbilical.getTask(containerContext1); http://git-wip-us.apache.org/repos/asf/tez/blob/b0568218/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index 79450a9..6ea1388 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -1399,7 +1399,7 @@ public class TestContainerReuse { InputDescriptor.create("inputClassName"), 1)), Collections.singletonList(new OutputSpec("vertexName", OutputDescriptor.create("outputClassName"), 1)), null), ta, locationHint, - priority.getPriority(), containerContext); + priority.getPriority(), containerContext, 0, 0, 0); return lr; } http://git-wip-us.apache.org/repos/asf/tez/blob/b0568218/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java index fc7aa50..b7a3a87 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java @@ -89,13 +89,13 @@ public class TestTaskSchedulerEventHandler { public MockTaskSchedulerEventHandler(AppContext appContext, DAGClientServer clientService, EventHandler eventHandler, ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) { - super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI); + super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new String[] {}); } - + @Override - protected TaskSchedulerService createTaskScheduler(String host, int port, - String trackingUrl, AppContext appContext) { - return mockTaskScheduler; + protected void instantiateScheduelrs(String host, int port, String trackingUrl, + AppContext appContext) { + taskSchedulers[0] = mockTaskScheduler; } @Override @@ -194,7 +194,7 @@ public class TestTaskSchedulerEventHandler { when(mockAppContext.getCurrentDAG().getVertex(affVertexName)).thenReturn(affVertex); Resource resource = Resource.newInstance(100, 1); AMSchedulerEventTALaunchRequest event = new AMSchedulerEventTALaunchRequest - (taId, resource, null, mockTaskAttempt, locHint, 3, null); + (taId, resource, null, mockTaskAttempt, locHint, 3, null, 0, 0, 0); schedulerHandler.notify.set(false); schedulerHandler.handle(event); synchronized (schedulerHandler.notify) { http://git-wip-us.apache.org/repos/asf/tez/blob/b0568218/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index 77c98b7..d775300 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -127,31 +127,29 @@ class TestTaskSchedulerHelpers { EventHandler eventHandler, TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync, ContainerSignatureMatcher containerSignatureMatcher) { - super(appContext, null, eventHandler, containerSignatureMatcher, null); + super(appContext, null, eventHandler, containerSignatureMatcher, null, new String[]{}); this.amrmClientAsync = amrmClientAsync; this.containerSignatureMatcher = containerSignatureMatcher; } @Override - public TaskSchedulerService createTaskScheduler(String host, int port, - String trackingUrl, AppContext appContext) { - return new TaskSchedulerWithDrainableAppCallback(this, + public void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) { + taskSchedulers[0] = new TaskSchedulerWithDrainableAppCallback(this, containerSignatureMatcher, host, port, trackingUrl, amrmClientAsync, appContext); } public TaskSchedulerService getSpyTaskScheduler() { - return this.taskScheduler; + return taskSchedulers[0]; } @Override public void serviceStart() { - TaskSchedulerService taskSchedulerReal = createTaskScheduler("host", 0, "", - appContext); + instantiateScheduelrs("host", 0, "", appContext); // Init the service so that reuse configuration is picked up. - ((AbstractService)taskSchedulerReal).init(getConfig()); - ((AbstractService)taskSchedulerReal).start(); - taskScheduler = spy(taskSchedulerReal); + ((AbstractService)taskSchedulers[0]).init(getConfig()); + ((AbstractService)taskSchedulers[0]).start(); + taskSchedulers[0] = spy(taskSchedulers[0]); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/b0568218/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java index a93c1a4..ae7e7f8 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java @@ -45,6 +45,8 @@ public class TestExternalTezServices { private static final Log LOG = LogFactory.getLog(TestExternalTezServices.class); + private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush"; + private static MiniTezCluster tezCluster; private static MiniDFSCluster dfsCluster; private static MiniTezTestServiceCluster tezTestServiceCluster; @@ -106,12 +108,17 @@ public class TestExternalTezServices { remoteFs.mkdirs(stagingDirPath); // This is currently configured to push tasks into the Service, and then use the standard RPC confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); - confForJobs.set(TezConfiguration.TEZ_AM_TASK_SCHEDULER_CLASS, - TezTestServiceTaskSchedulerService.class.getName()); - confForJobs.set(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS, - TezTestServiceNoOpContainerLauncher.class.getName()); - confForJobs.set(TezConfiguration.TEZ_AM_TASK_COMMUNICATOR_CLASS, - TezTestServiceTaskCommunicatorImpl.class.getName()); + confForJobs.set(TezConfiguration.TEZ_AM_TASK_SCHEDULERS, + EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskSchedulerService.class.getName()); + confForJobs.set(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS, + EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceNoOpContainerLauncher.class.getName()); + confForJobs.set(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS, + EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskCommunicatorImpl.class.getName()); + + confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME); + confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME); + confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME); + TezConfiguration tezConf = new TezConfiguration(confForJobs);
