This is an automated email from the ASF dual-hosted git repository. abstractdog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push: new 261362e4f TEZ-4647: Refactor plugin management from DAGAppMaster (#428) (Laszlo Bodor reviewed by Ayush Saxena) 261362e4f is described below commit 261362e4febe73962147e1a2974de70f3482d9fe Author: Bodor Laszlo <bodorlaszlo0...@gmail.com> AuthorDate: Mon Sep 15 13:04:39 2025 +0200 TEZ-4647: Refactor plugin management from DAGAppMaster (#428) (Laszlo Bodor reviewed by Ayush Saxena) --- tez-dag/findbugs-exclude.xml | 25 ++ .../java/org/apache/tez/dag/app/DAGAppMaster.java | 163 +------------ .../java/org/apache/tez/dag/app/PluginManager.java | 264 +++++++++++++++++++++ .../org/apache/tez/dag/app/TestDAGAppMaster.java | 143 +++++------ .../tez/dag/app/dag/impl/TestVertexImpl2.java | 24 +- .../tez/dag/app/rm/TestTaskSchedulerManager.java | 10 +- 6 files changed, 373 insertions(+), 256 deletions(-) diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml index e8755832f..9ee1b3d94 100644 --- a/tez-dag/findbugs-exclude.xml +++ b/tez-dag/findbugs-exclude.xml @@ -248,4 +248,29 @@ <Bug pattern="EI_EXPOSE_REP2"/> </Match> + <!-- TEZ-4647 PluginManager EI_EXPOSE_REP warnings --> + <Match> + <Class name="org.apache.tez.dag.app.PluginManager"/> + <Method name="getTaskSchedulers"/> + <Bug pattern="EI_EXPOSE_REP"/> + </Match> + + <Match> + <Class name="org.apache.tez.dag.app.PluginManager"/> + <Method name="getContainerLaunchers"/> + <Bug pattern="EI_EXPOSE_REP"/> + </Match> + + <Match> + <Class name="org.apache.tez.dag.app.PluginManager"/> + <Method name="getTaskCommunicators"/> + <Bug pattern="EI_EXPOSE_REP"/> + </Match> + + <Match> + <Class name="org.apache.tez.dag.app.PluginManager"/> + <Method name="<init>"/> + <Bug pattern="EI_EXPOSE_REP2"/> + </Match> + </FindBugsFilter> diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 99579c7ff..ec4a89be0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -132,7 +132,6 @@ import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto; -import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData; import org.apache.tez.dag.app.dag.DAG; @@ -195,9 +194,6 @@ import org.apache.tez.util.TezMxBeanResourceCalculator; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -254,7 +250,6 @@ public class DAGAppMaster extends AbstractService { private final String workingDirectory; private final String[] localDirs; private final String[] logDirs; - private final AMPluginDescriptorProto amPluginDescriptorProto; private HadoopShim hadoopShim; private ContainerSignatureMatcher containerSignatureMatcher; private AMContainerMap containers; @@ -312,11 +307,8 @@ public class DAGAppMaster extends AbstractService { private FileSystem recoveryFS; private ListeningExecutorService execService; + private final PluginManager pluginManager; - // TODO May not need to be a bidi map - private final BiMap<String, Integer> taskSchedulers = HashBiMap.create(); - private final BiMap<String, Integer> containerLaunchers = HashBiMap.create(); - private final BiMap<String, Integer> taskCommunicators = HashBiMap.create(); /** * set of already executed dag names. @@ -376,7 +368,6 @@ public class DAGAppMaster extends AbstractService { this.dagVersionInfo = new TezDagVersionInfo(); this.clientVersion = clientVersion; this.amCredentials = credentials; - this.amPluginDescriptorProto = pluginDescriptorProto; this.appMasterUgi = UserGroupInformation .createRemoteUser(jobUserName); this.appMasterUgi.addCredentials(amCredentials); @@ -387,6 +378,8 @@ public class DAGAppMaster extends AbstractService { LOG.info("Created DAGAppMaster for application " + applicationAttemptId + ", versionInfo=" + dagVersionInfo); TezCommonUtils.logCredentials(LOG, this.appMasterUgi.getCredentials(), "am"); + + this.pluginManager = new PluginManager(pluginDescriptorProto); } // Pull this WebAppUtils function into Tez until YARN-4186 @@ -451,18 +444,10 @@ public class DAGAppMaster extends AbstractService { UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(amConf); - List<NamedEntityDescriptor> taskSchedulerDescriptors = Lists.newLinkedList(); - List<NamedEntityDescriptor> containerLauncherDescriptors = Lists.newLinkedList(); - List<NamedEntityDescriptor> taskCommunicatorDescriptors = Lists.newLinkedList(); - - parseAllPlugins(taskSchedulerDescriptors, taskSchedulers, containerLauncherDescriptors, - containerLaunchers, taskCommunicatorDescriptors, taskCommunicators, amPluginDescriptorProto, - isLocal, defaultPayload); - - - LOG.info(buildPluginComponentLog(taskSchedulerDescriptors, taskSchedulers, "TaskSchedulers")); - LOG.info(buildPluginComponentLog(containerLauncherDescriptors, containerLaunchers, "ContainerLaunchers")); - LOG.info(buildPluginComponentLog(taskCommunicatorDescriptors, taskCommunicators, "TaskCommunicators")); + PluginManager.PluginDescriptors pluginDescriptors = pluginManager.parseAllPlugins(isLocal, defaultPayload); + List<NamedEntityDescriptor> taskSchedulerDescriptors = pluginDescriptors.getTaskSchedulerDescriptors(); + List<NamedEntityDescriptor> containerLauncherDescriptors = pluginDescriptors.getContainerLauncherDescriptors(); + List<NamedEntityDescriptor> taskCommunicatorDescriptors = pluginDescriptors.getTaskCommunicatorDescriptors(); boolean disableVersionCheck = conf.getBoolean( TezConfiguration.TEZ_AM_DISABLE_CLIENT_VERSION_CHECK, @@ -1672,32 +1657,32 @@ public class DAGAppMaster extends AbstractService { @Override public Integer getTaskCommunicatorIdentifier(String name) { - return taskCommunicators.get(name); + return pluginManager.getTaskCommunicators().get(name); } @Override public Integer getTaskScheduerIdentifier(String name) { - return taskSchedulers.get(name); + return pluginManager.getTaskSchedulers().get(name); } @Override public Integer getContainerLauncherIdentifier(String name) { - return containerLaunchers.get(name); + return pluginManager.getContainerLaunchers().get(name); } @Override public String getTaskCommunicatorName(int taskCommId) { - return taskCommunicators.inverse().get(taskCommId); + return pluginManager.getTaskCommunicators().inverse().get(taskCommId); } @Override public String getTaskSchedulerName(int schedulerId) { - return taskSchedulers.inverse().get(schedulerId); + return pluginManager.getTaskSchedulers().inverse().get(schedulerId); } @Override public String getContainerLauncherName(int launcherId) { - return containerLaunchers.inverse().get(launcherId); + return pluginManager.getContainerLaunchers().inverse().get(launcherId); } @Override @@ -2732,128 +2717,6 @@ public class DAGAppMaster extends AbstractService { return webUIService == null ? null : webUIService.getBaseUrl(); } - @VisibleForTesting - public static void parseAllPlugins( - List<NamedEntityDescriptor> taskSchedulerDescriptors, BiMap<String, Integer> taskSchedulerPluginMap, - List<NamedEntityDescriptor> containerLauncherDescriptors, BiMap<String, Integer> containerLauncherPluginMap, - List<NamedEntityDescriptor> taskCommDescriptors, BiMap<String, Integer> taskCommPluginMap, - AMPluginDescriptorProto amPluginDescriptorProto, boolean isLocal, UserPayload defaultPayload) { - - boolean tezYarnEnabled; - boolean uberEnabled; - if (!isLocal) { - if (amPluginDescriptorProto == null) { - tezYarnEnabled = true; - uberEnabled = false; - } else { - tezYarnEnabled = amPluginDescriptorProto.getContainersEnabled(); - uberEnabled = amPluginDescriptorProto.getUberEnabled(); - } - } else { - tezYarnEnabled = false; - uberEnabled = true; - } - - parsePlugin(taskSchedulerDescriptors, taskSchedulerPluginMap, - (amPluginDescriptorProto == null || amPluginDescriptorProto.getTaskSchedulersCount() == 0 ? - null : - amPluginDescriptorProto.getTaskSchedulersList()), - tezYarnEnabled, uberEnabled, defaultPayload); - processSchedulerDescriptors(taskSchedulerDescriptors, isLocal, defaultPayload, taskSchedulerPluginMap); - - parsePlugin(containerLauncherDescriptors, containerLauncherPluginMap, - (amPluginDescriptorProto == null || - amPluginDescriptorProto.getContainerLaunchersCount() == 0 ? null : - amPluginDescriptorProto.getContainerLaunchersList()), - tezYarnEnabled, uberEnabled, defaultPayload); - - parsePlugin(taskCommDescriptors, taskCommPluginMap, - (amPluginDescriptorProto == null || - amPluginDescriptorProto.getTaskCommunicatorsCount() == 0 ? null : - amPluginDescriptorProto.getTaskCommunicatorsList()), - tezYarnEnabled, uberEnabled, defaultPayload); - } - - - @VisibleForTesting - public static void parsePlugin(List<NamedEntityDescriptor> resultList, - BiMap<String, Integer> pluginMap, List<TezNamedEntityDescriptorProto> namedEntityDescriptorProtos, - boolean tezYarnEnabled, boolean uberEnabled, UserPayload defaultPayload) { - - if (tezYarnEnabled) { - // Default classnames will be populated by individual components - NamedEntityDescriptor r = new NamedEntityDescriptor( - TezConstants.getTezYarnServicePluginName(), null).setUserPayload(defaultPayload); - addDescriptor(resultList, pluginMap, r); - } - - if (uberEnabled) { - // Default classnames will be populated by individual components - NamedEntityDescriptor r = new NamedEntityDescriptor( - TezConstants.getTezUberServicePluginName(), null).setUserPayload(defaultPayload); - addDescriptor(resultList, pluginMap, r); - } - - if (namedEntityDescriptorProtos != null) { - for (TezNamedEntityDescriptorProto namedEntityDescriptorProto : namedEntityDescriptorProtos) { - NamedEntityDescriptor namedEntityDescriptor = DagTypeConverters - .convertNamedDescriptorFromProto(namedEntityDescriptorProto); - addDescriptor(resultList, pluginMap, namedEntityDescriptor); - } - } - } - - @VisibleForTesting - static void addDescriptor(List<NamedEntityDescriptor> list, BiMap<String, Integer> pluginMap, - NamedEntityDescriptor namedEntityDescriptor) { - list.add(namedEntityDescriptor); - pluginMap.put(list.get(list.size() - 1).getEntityName(), list.size() - 1); - } - - @VisibleForTesting - static void processSchedulerDescriptors(List<NamedEntityDescriptor> descriptors, boolean isLocal, - UserPayload defaultPayload, - BiMap<String, Integer> schedulerPluginMap) { - if (isLocal) { - boolean foundUberServiceName = false; - for (NamedEntityDescriptor descriptor : descriptors) { - if (descriptor.getEntityName().equals(TezConstants.getTezUberServicePluginName())) { - foundUberServiceName = true; - break; - } - } - Preconditions.checkState(foundUberServiceName); - } else { - boolean foundYarn = false; - for (int i = 0; i < descriptors.size(); i++) { - if (descriptors.get(i).getEntityName().equals(TezConstants.getTezYarnServicePluginName())) { - foundYarn = true; - break; - } - } - if (!foundYarn) { - NamedEntityDescriptor yarnDescriptor = - new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null) - .setUserPayload(defaultPayload); - addDescriptor(descriptors, schedulerPluginMap, yarnDescriptor); - } - } - } - - String buildPluginComponentLog(List<NamedEntityDescriptor> namedEntityDescriptors, BiMap<String, Integer> map, - String component) { - StringBuilder sb = new StringBuilder(); - sb.append("AM Level configured ").append(component).append(": "); - for (int i = 0; i < namedEntityDescriptors.size(); i++) { - sb.append("[").append(i).append(":").append(map.inverse().get(i)) - .append(":").append(namedEntityDescriptors.get(i).getClassName()).append("]"); - if (i != namedEntityDescriptors.size() - 1) { - sb.append(","); - } - } - return sb.toString(); - } - public void vertexComplete(TezVertexID completedVertexID, Set<NodeId> nodesList) { getContainerLauncherManager().vertexComplete(completedVertexID, jobTokenSecretManager, nodesList); } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/PluginManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/PluginManager.java new file mode 100644 index 000000000..24f607758 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/PluginManager.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.dag.app; + +import java.util.Collections; +import java.util.List; + +import org.apache.tez.dag.api.DagTypeConverters; +import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; +import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Lists; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manager for AM service plugins. + * + * This component parses the configured plugins for TaskSchedulers, + * ContainerLaunchers, and TaskCommunicators from the AM configuration, + * maintains their name-to-identifier mappings, and provides the parsed + * descriptor lists used to initialize the corresponding managers. + */ +public class PluginManager { + + private static final Logger LOG = LoggerFactory.getLogger(PluginManager.class); + + private final AMPluginDescriptorProto amPluginDescriptorProto; + + // Plugin maps for task schedulers, container launchers, and task communicators + private final BiMap<String, Integer> taskSchedulers = HashBiMap.create(); + private final BiMap<String, Integer> containerLaunchers = HashBiMap.create(); + private final BiMap<String, Integer> taskCommunicators = HashBiMap.create(); + + /** + * Wrapper for parsed plugin descriptors. + * + * The descriptor lists exposed by this class are unmodifiable snapshots + * created at parse time. Callers must not attempt to modify these + * collections; any modification attempts will throw an exception. + */ + public static final class PluginDescriptors { + private final List<NamedEntityDescriptor> taskSchedulerDescriptors; + private final List<NamedEntityDescriptor> containerLauncherDescriptors; + private final List<NamedEntityDescriptor> taskCommunicatorDescriptors; + + public PluginDescriptors(List<NamedEntityDescriptor> taskSchedulerDescriptors, + List<NamedEntityDescriptor> containerLauncherDescriptors, + List<NamedEntityDescriptor> taskCommunicatorDescriptors) { + this.taskSchedulerDescriptors = Collections.unmodifiableList(taskSchedulerDescriptors); + this.containerLauncherDescriptors = Collections.unmodifiableList(containerLauncherDescriptors); + this.taskCommunicatorDescriptors = Collections.unmodifiableList(taskCommunicatorDescriptors); + } + + public List<NamedEntityDescriptor> getTaskSchedulerDescriptors() { + return taskSchedulerDescriptors; + } + + public List<NamedEntityDescriptor> getContainerLauncherDescriptors() { + return containerLauncherDescriptors; + } + + public List<NamedEntityDescriptor> getTaskCommunicatorDescriptors() { + return taskCommunicatorDescriptors; + } + } + + public PluginManager() { + this(null); + } + + public PluginManager(AMPluginDescriptorProto amPluginDescriptorProto) { + this.amPluginDescriptorProto = amPluginDescriptorProto; + } + + /** + * Parse all plugins for task schedulers, container launchers, and task communicators. + */ + public PluginDescriptors parseAllPlugins(boolean isLocal, UserPayload defaultPayload) { + + List<NamedEntityDescriptor> taskSchedulerDescriptors = Lists.newLinkedList(); + List<NamedEntityDescriptor> containerLauncherDescriptors = Lists.newLinkedList(); + List<NamedEntityDescriptor> taskCommDescriptors = Lists.newLinkedList(); + + boolean tezYarnEnabled; + boolean uberEnabled; + if (!isLocal) { + if (amPluginDescriptorProto == null) { + tezYarnEnabled = true; + uberEnabled = false; + } else { + tezYarnEnabled = amPluginDescriptorProto.getContainersEnabled(); + uberEnabled = amPluginDescriptorProto.getUberEnabled(); + } + } else { + tezYarnEnabled = false; + uberEnabled = true; + } + + // parse task scheduler plugins + parsePlugin(taskSchedulerDescriptors, taskSchedulers, + (amPluginDescriptorProto == null || amPluginDescriptorProto.getTaskSchedulersCount() == 0 ? + null : + amPluginDescriptorProto.getTaskSchedulersList()), + tezYarnEnabled, uberEnabled, defaultPayload); + + // post-process task scheduler plugin descriptors + processSchedulerDescriptors(taskSchedulerDescriptors, isLocal, defaultPayload, taskSchedulers); + + // parse container launcher plugins + parsePlugin(containerLauncherDescriptors, containerLaunchers, + (amPluginDescriptorProto == null || + amPluginDescriptorProto.getContainerLaunchersCount() == 0 ? null : + amPluginDescriptorProto.getContainerLaunchersList()), + tezYarnEnabled, uberEnabled, defaultPayload); + + // parse task communicator plugins + parsePlugin(taskCommDescriptors, taskCommunicators, + (amPluginDescriptorProto == null || + amPluginDescriptorProto.getTaskCommunicatorsCount() == 0 ? null : + amPluginDescriptorProto.getTaskCommunicatorsList()), + tezYarnEnabled, uberEnabled, defaultPayload); + + // Log plugin component information + LOG.info(buildPluginComponentLog(taskSchedulerDescriptors, taskSchedulers, "TaskSchedulers")); + LOG.info(buildPluginComponentLog(containerLauncherDescriptors, containerLaunchers, "ContainerLaunchers")); + LOG.info(buildPluginComponentLog(taskCommDescriptors, taskCommunicators, "TaskCommunicators")); + + return new PluginDescriptors(taskSchedulerDescriptors, containerLauncherDescriptors, taskCommDescriptors); + } + + /** + * Parse a specific plugin type. + */ + @VisibleForTesting + public static void parsePlugin(List<NamedEntityDescriptor> resultList, + BiMap<String, Integer> pluginMap, List<TezNamedEntityDescriptorProto> namedEntityDescriptorProtos, + boolean tezYarnEnabled, boolean uberEnabled, UserPayload defaultPayload) { + + if (tezYarnEnabled) { + // Default classnames will be populated by individual components + NamedEntityDescriptor descriptor = new NamedEntityDescriptor( + TezConstants.getTezYarnServicePluginName(), null).setUserPayload(defaultPayload); + addDescriptor(resultList, pluginMap, descriptor); + } + + if (uberEnabled) { + // Default classnames will be populated by individual components + NamedEntityDescriptor descriptor = new NamedEntityDescriptor( + TezConstants.getTezUberServicePluginName(), null).setUserPayload(defaultPayload); + addDescriptor(resultList, pluginMap, descriptor); + } + + if (namedEntityDescriptorProtos != null) { + for (TezNamedEntityDescriptorProto namedEntityDescriptorProto : namedEntityDescriptorProtos) { + NamedEntityDescriptor descriptor = DagTypeConverters + .convertNamedDescriptorFromProto(namedEntityDescriptorProto); + addDescriptor(resultList, pluginMap, descriptor); + } + } + } + + /** + * Add a descriptor to the list and map. + */ + public static void addDescriptor(List<NamedEntityDescriptor> list, BiMap<String, Integer> pluginMap, + NamedEntityDescriptor namedEntityDescriptor) { + list.add(namedEntityDescriptor); + pluginMap.put(list.getLast().getEntityName(), list.size() - 1); + } + + /** + * Process scheduler descriptors with framework-specific logic. + */ + public void processSchedulerDescriptors(List<NamedEntityDescriptor> descriptors, boolean isLocal, + UserPayload defaultPayload, + BiMap<String, Integer> schedulerPluginMap) { + if (isLocal) { + boolean foundUberServiceName = false; + for (NamedEntityDescriptor<?> descriptor : descriptors) { + if (descriptor.getEntityName().equals(TezConstants.getTezUberServicePluginName())) { + foundUberServiceName = true; + break; + } + } + Preconditions.checkState(foundUberServiceName); + } else { + boolean foundYarn = false; + for (NamedEntityDescriptor descriptor : descriptors) { + if (descriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName())) { + foundYarn = true; + break; + } + } + if (!foundYarn) { + NamedEntityDescriptor<?> yarnDescriptor = + new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null) + .setUserPayload(defaultPayload); + addDescriptor(descriptors, schedulerPluginMap, yarnDescriptor); + } + } + } + + /** + * Get the task schedulers map. + */ + public BiMap<String, Integer> getTaskSchedulers() { + return taskSchedulers; + } + + /** + * Get the container launchers map. + */ + public BiMap<String, Integer> getContainerLaunchers() { + return containerLaunchers; + } + + /** + * Get the task communicators map. + */ + public BiMap<String, Integer> getTaskCommunicators() { + return taskCommunicators; + } + + /** + * Build a log message for plugin component information. + */ + private String buildPluginComponentLog(List<NamedEntityDescriptor> namedEntityDescriptors, BiMap<String, Integer> map, + String component) { + StringBuilder sb = new StringBuilder(); + sb.append("AM Level configured ").append(component).append(": "); + for (int i = 0; i < namedEntityDescriptors.size(); i++) { + sb.append("[").append(i).append(":").append(map.inverse().get(i)) + .append(":").append(namedEntityDescriptors.get(i).getClassName()).append("]"); + if (i != namedEntityDescriptors.size() - 1) { + sb.append(","); + } + } + return sb.toString(); + } +} diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java index 85a8248b9..01a61b64e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java @@ -87,7 +87,6 @@ import org.apache.tez.dag.records.TezVertexID; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; -import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import org.junit.After; @@ -158,7 +157,7 @@ public class TestDAGAppMaster { // Test empty descriptor list, yarn enabled pluginMap.clear(); entities = new LinkedList<>(); - DAGAppMaster.parsePlugin(entities, pluginMap, null, true, false, defaultPayload); + PluginManager.parsePlugin(entities, pluginMap, null, true, false, defaultPayload); assertEquals(1, pluginMap.size()); assertEquals(1, entities.size()); assertTrue(pluginMap.containsKey(TezConstants.getTezYarnServicePluginName())); @@ -169,7 +168,7 @@ public class TestDAGAppMaster { // Test empty descriptor list, uber enabled pluginMap.clear(); entities = new LinkedList<>(); - DAGAppMaster.parsePlugin(entities, pluginMap, null, false, true, defaultPayload); + PluginManager.parsePlugin(entities, pluginMap, null, false, true, defaultPayload); assertEquals(1, pluginMap.size()); assertEquals(1, entities.size()); assertTrue(pluginMap.containsKey(TezConstants.getTezUberServicePluginName())); @@ -180,7 +179,7 @@ public class TestDAGAppMaster { // Test empty descriptor list, yarn enabled, uber enabled pluginMap.clear(); entities = new LinkedList<>(); - DAGAppMaster.parsePlugin(entities, pluginMap, null, true, true, defaultPayload); + PluginManager.parsePlugin(entities, pluginMap, null, true, true, defaultPayload); assertEquals(2, pluginMap.size()); assertEquals(2, entities.size()); assertTrue(pluginMap.containsKey(TezConstants.getTezYarnServicePluginName())); @@ -203,7 +202,7 @@ public class TestDAGAppMaster { // Test descriptor, no yarn, no uber pluginMap.clear(); entities = new LinkedList<>(); - DAGAppMaster.parsePlugin(entities, pluginMap, entityDescriptors, false, false, defaultPayload); + PluginManager.parsePlugin(entities, pluginMap, entityDescriptors, false, false, defaultPayload); assertEquals(1, pluginMap.size()); assertEquals(1, entities.size()); assertTrue(pluginMap.containsKey(pluginName)); @@ -212,7 +211,7 @@ public class TestDAGAppMaster { // Test descriptor, yarn and uber pluginMap.clear(); entities = new LinkedList<>(); - DAGAppMaster.parsePlugin(entities, pluginMap, entityDescriptors, true, true, defaultPayload); + PluginManager.parsePlugin(entities, pluginMap, entityDescriptors, true, true, defaultPayload); assertEquals(3, pluginMap.size()); assertEquals(3, entities.size()); assertTrue(pluginMap.containsKey(TezConstants.getTezYarnServicePluginName())); @@ -227,43 +226,37 @@ public class TestDAGAppMaster { @Test(timeout = 5000) public void testParseAllPluginsNoneSpecified() throws IOException { + PluginManager pluginManager = new PluginManager(); Configuration conf = new Configuration(false); conf.set(TEST_KEY, TEST_VAL); UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(conf); - List<NamedEntityDescriptor> tsDescriptors; - BiMap<String, Integer> tsMap; - List<NamedEntityDescriptor> clDescriptors; - BiMap<String, Integer> clMap; - List<NamedEntityDescriptor> tcDescriptors; - BiMap<String, Integer> tcMap; - - // No plugins. Non local - tsDescriptors = Lists.newLinkedList(); - tsMap = HashBiMap.create(); - clDescriptors = Lists.newLinkedList(); - clMap = HashBiMap.create(); - tcDescriptors = Lists.newLinkedList(); - tcMap = HashBiMap.create(); - DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, clDescriptors, clMap, tcDescriptors, tcMap, - null, false, defaultPayload); - verifyDescAndMap(tsDescriptors, tsMap, 1, true, TezConstants.getTezYarnServicePluginName()); - verifyDescAndMap(clDescriptors, clMap, 1, true, TezConstants.getTezYarnServicePluginName()); - verifyDescAndMap(tcDescriptors, tcMap, 1, true, TezConstants.getTezYarnServicePluginName()); + PluginManager.PluginDescriptors pluginDescriptorsNonLocal = pluginManager.parseAllPlugins(false, defaultPayload); + + verifyDescAndMap(pluginDescriptorsNonLocal.getTaskSchedulerDescriptors(), + pluginManager.getTaskSchedulers(), 1, true, + TezConstants.getTezYarnServicePluginName()); + verifyDescAndMap(pluginDescriptorsNonLocal.getContainerLauncherDescriptors(), + pluginManager.getContainerLaunchers(), 1, true, + TezConstants.getTezYarnServicePluginName()); + verifyDescAndMap(pluginDescriptorsNonLocal.getTaskCommunicatorDescriptors(), + pluginManager.getTaskCommunicators(), 1, true, + TezConstants.getTezYarnServicePluginName()); // No plugins. Local - tsDescriptors = Lists.newLinkedList(); - tsMap = HashBiMap.create(); - clDescriptors = Lists.newLinkedList(); - clMap = HashBiMap.create(); - tcDescriptors = Lists.newLinkedList(); - tcMap = HashBiMap.create(); - DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, clDescriptors, clMap, tcDescriptors, tcMap, - null, true, defaultPayload); - verifyDescAndMap(tsDescriptors, tsMap, 1, true, TezConstants.getTezUberServicePluginName()); - verifyDescAndMap(clDescriptors, clMap, 1, true, TezConstants.getTezUberServicePluginName()); - verifyDescAndMap(tcDescriptors, tcMap, 1, true, TezConstants.getTezUberServicePluginName()); + pluginManager = new PluginManager(); + + PluginManager.PluginDescriptors pluginDescriptorsLocal = pluginManager.parseAllPlugins(true, defaultPayload); + verifyDescAndMap(pluginDescriptorsLocal.getTaskSchedulerDescriptors(), + pluginManager.getTaskSchedulers(), 1, true, + TezConstants.getTezUberServicePluginName()); + verifyDescAndMap(pluginDescriptorsLocal.getContainerLauncherDescriptors(), + pluginManager.getContainerLaunchers(), 1, true, + TezConstants.getTezUberServicePluginName()); + verifyDescAndMap(pluginDescriptorsLocal.getTaskCommunicatorDescriptors(), + pluginManager.getTaskCommunicators(), 1, true, + TezConstants.getTezUberServicePluginName()); } @Test(timeout = 5000) @@ -276,30 +269,19 @@ public class TestDAGAppMaster { AMPluginDescriptorProto proto = createAmPluginDescriptor(false, false, true, payloadProto); - List<NamedEntityDescriptor> tsDescriptors; - BiMap<String, Integer> tsMap; - List<NamedEntityDescriptor> clDescriptors; - BiMap<String, Integer> clMap; - List<NamedEntityDescriptor> tcDescriptors; - BiMap<String, Integer> tcMap; - - // Only plugin, Yarn. - tsDescriptors = Lists.newLinkedList(); - tsMap = HashBiMap.create(); - clDescriptors = Lists.newLinkedList(); - clMap = HashBiMap.create(); - tcDescriptors = Lists.newLinkedList(); - tcMap = HashBiMap.create(); - DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, clDescriptors, clMap, tcDescriptors, tcMap, - proto, false, defaultPayload); - verifyDescAndMap(tsDescriptors, tsMap, 2, true, TS_NAME, + PluginManager pluginManager = new PluginManager(proto); + PluginManager.PluginDescriptors pluginDescriptors = pluginManager.parseAllPlugins(false, defaultPayload); + verifyDescAndMap(pluginDescriptors.getTaskSchedulerDescriptors(), + pluginManager.getTaskSchedulers(), 2, true, TS_NAME, TezConstants.getTezYarnServicePluginName()); - verifyDescAndMap(clDescriptors, clMap, 1, true, CL_NAME); - verifyDescAndMap(tcDescriptors, tcMap, 1, true, TC_NAME); - assertEquals(TS_NAME + CLASS_SUFFIX, tsDescriptors.get(0).getClassName()); - assertEquals(CL_NAME + CLASS_SUFFIX, clDescriptors.get(0).getClassName()); - assertEquals(TC_NAME + CLASS_SUFFIX, tcDescriptors.get(0).getClassName()); + verifyDescAndMap(pluginDescriptors.getContainerLauncherDescriptors(), + pluginManager.getContainerLaunchers(), 1, true, CL_NAME); + verifyDescAndMap(pluginDescriptors.getTaskCommunicatorDescriptors(), + pluginManager.getTaskCommunicators(), 1, true, TC_NAME); + assertEquals(TS_NAME + CLASS_SUFFIX, pluginDescriptors.getTaskSchedulerDescriptors().get(0).getClassName()); + assertEquals(CL_NAME + CLASS_SUFFIX, pluginDescriptors.getContainerLauncherDescriptors().get(0).getClassName()); + assertEquals(TC_NAME + CLASS_SUFFIX, pluginDescriptors.getTaskCommunicatorDescriptors().get(0).getClassName()); } @Test(timeout = 5000) @@ -312,35 +294,24 @@ public class TestDAGAppMaster { AMPluginDescriptorProto proto = createAmPluginDescriptor(true, false, true, payloadProto); - List<NamedEntityDescriptor> tsDescriptors; - BiMap<String, Integer> tsMap; - List<NamedEntityDescriptor> clDescriptors; - BiMap<String, Integer> clMap; - List<NamedEntityDescriptor> tcDescriptors; - BiMap<String, Integer> tcMap; - - // Only plugin, Yarn. - tsDescriptors = Lists.newLinkedList(); - tsMap = HashBiMap.create(); - clDescriptors = Lists.newLinkedList(); - clMap = HashBiMap.create(); - tcDescriptors = Lists.newLinkedList(); - tcMap = HashBiMap.create(); - DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, clDescriptors, clMap, tcDescriptors, tcMap, - proto, false, defaultPayload); - verifyDescAndMap(tsDescriptors, tsMap, 2, true, TezConstants.getTezYarnServicePluginName(), - TS_NAME); - verifyDescAndMap(clDescriptors, clMap, 2, true, TezConstants.getTezYarnServicePluginName(), - CL_NAME); - verifyDescAndMap(tcDescriptors, tcMap, 2, true, TezConstants.getTezYarnServicePluginName(), - TC_NAME); - assertNull(tsDescriptors.get(0).getClassName()); - assertNull(clDescriptors.get(0).getClassName()); - assertNull(tcDescriptors.get(0).getClassName()); - assertEquals(TS_NAME + CLASS_SUFFIX, tsDescriptors.get(1).getClassName()); - assertEquals(CL_NAME + CLASS_SUFFIX, clDescriptors.get(1).getClassName()); - assertEquals(TC_NAME + CLASS_SUFFIX, tcDescriptors.get(1).getClassName()); + PluginManager pluginManager = new PluginManager(proto); + PluginManager.PluginDescriptors pluginDescriptors = pluginManager.parseAllPlugins(false, defaultPayload); + + verifyDescAndMap(pluginDescriptors.getTaskSchedulerDescriptors(), pluginManager.getTaskSchedulers(), + 2, true, TezConstants.getTezYarnServicePluginName(), TS_NAME); + verifyDescAndMap(pluginDescriptors.getContainerLauncherDescriptors(), pluginManager.getContainerLaunchers(), + 2, true, TezConstants.getTezYarnServicePluginName(), CL_NAME); + verifyDescAndMap(pluginDescriptors.getTaskCommunicatorDescriptors(), pluginManager.getTaskCommunicators(), + 2, true, TezConstants.getTezYarnServicePluginName(), TC_NAME); + + assertNull(pluginDescriptors.getTaskSchedulerDescriptors().get(0).getClassName()); + assertNull(pluginDescriptors.getContainerLauncherDescriptors().get(0).getClassName()); + assertNull(pluginDescriptors.getTaskCommunicatorDescriptors().get(0).getClassName()); + + assertEquals(TS_NAME + CLASS_SUFFIX, pluginDescriptors.getTaskSchedulerDescriptors().get(1).getClassName()); + assertEquals(CL_NAME + CLASS_SUFFIX, pluginDescriptors.getContainerLauncherDescriptors().get(1).getClassName()); + assertEquals(TC_NAME + CLASS_SUFFIX, pluginDescriptors.getTaskCommunicatorDescriptors().get(1).getClassName()); } @Test(timeout = 60000) diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java index 1f6d1e5e8..c29a47160 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java @@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.DagTypeConverters; -import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; @@ -52,7 +51,7 @@ import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; -import org.apache.tez.dag.app.DAGAppMaster; +import org.apache.tez.dag.app.PluginManager; import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAG; @@ -381,13 +380,9 @@ public class TestVertexImpl2 { } catch (IOException e) { throw new TezUncheckedException(e); } - DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), taskSchedulers, null, - true, false, defaultPayload); - DAGAppMaster - .parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), containerLaunchers, null, - true, false, defaultPayload); - DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), taskComms, null, - true, false, defaultPayload); + PluginManager.parsePlugin(Lists.newLinkedList(), taskSchedulers, null, true, false, defaultPayload); + PluginManager.parsePlugin(Lists.newLinkedList(), containerLaunchers, null, true, false, defaultPayload); + PluginManager.parsePlugin(Lists.newLinkedList(), taskComms, null, true, false, defaultPayload); } else { // Add N plugins, no YARN defaults List<TezNamedEntityDescriptorProto> schedulerList = new LinkedList<>(); List<TezNamedEntityDescriptorProto> launcherList = new LinkedList<>(); @@ -407,13 +402,9 @@ public class TestVertexImpl2 { DAGProtos.TezEntityDescriptorProto.newBuilder() .setClassName(append(TASK_COMM_NAME_BASE, i))).build()); } - DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), taskSchedulers, - schedulerList, false, false, null); - DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), containerLaunchers, - launcherList, false, false, null); - DAGAppMaster - .parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), taskComms, taskCommList, - false, false, null); + PluginManager.parsePlugin(Lists.newLinkedList(), taskSchedulers, schedulerList, false, false, null); + PluginManager.parsePlugin(Lists.newLinkedList(), containerLaunchers, launcherList, false, false, null); + PluginManager.parsePlugin(Lists.newLinkedList(), taskComms, taskCommList, false, false, null); } this.appContext = createDefaultMockAppContext(); @@ -557,4 +548,5 @@ public class TestVertexImpl2 { doReturn(mockDag).when(appContext).getCurrentDAG(); return appContext; } + } diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java index 3795e3e6b..3b9936d28 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java @@ -80,8 +80,8 @@ import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ClusterInfo; import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; -import org.apache.tez.dag.app.DAGAppMaster; import org.apache.tez.dag.app.DAGAppMasterState; +import org.apache.tez.dag.app.PluginManager; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.dag.DAG; @@ -865,14 +865,16 @@ public class TestTaskSchedulerManager { UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(tezConf); // Parse plugins - List<NamedEntityDescriptor> tsDescriptors = Lists.newLinkedList(); + PluginManager pluginManager = new PluginManager(); + PluginManager.PluginDescriptors pluginDescriptors = pluginManager.parseAllPlugins(false, defaultPayload); + List<NamedEntityDescriptor> tsDescriptors = pluginDescriptors.getTaskSchedulerDescriptors(); BiMap<String, Integer> tsMap = HashBiMap.create(); - DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, Lists.newLinkedList(), HashBiMap.create(), Lists.newLinkedList(), - HashBiMap.create(), null, false, defaultPayload); // Only TezYarn found. Assert.assertEquals(1, tsDescriptors.size()); Assert.assertEquals(TezConstants.getTezYarnServicePluginName(), tsDescriptors.get(0).getEntityName()); + Assert.assertEquals(1, pluginManager.getTaskSchedulers().size()); + Assert.assertTrue(pluginManager.getTaskSchedulers().containsKey(TezConstants.getTezYarnServicePluginName())); // Construct eventHandler TestTaskSchedulerHelpers.CapturingEventHandler eventHandler = new TestTaskSchedulerHelpers.CapturingEventHandler();