This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 261362e4f TEZ-4647: Refactor plugin management from DAGAppMaster 
(#428) (Laszlo Bodor reviewed by Ayush Saxena)
261362e4f is described below

commit 261362e4febe73962147e1a2974de70f3482d9fe
Author: Bodor Laszlo <bodorlaszlo0...@gmail.com>
AuthorDate: Mon Sep 15 13:04:39 2025 +0200

    TEZ-4647: Refactor plugin management from DAGAppMaster (#428) (Laszlo Bodor 
reviewed by Ayush Saxena)
---
 tez-dag/findbugs-exclude.xml                       |  25 ++
 .../java/org/apache/tez/dag/app/DAGAppMaster.java  | 163 +------------
 .../java/org/apache/tez/dag/app/PluginManager.java | 264 +++++++++++++++++++++
 .../org/apache/tez/dag/app/TestDAGAppMaster.java   | 143 +++++------
 .../tez/dag/app/dag/impl/TestVertexImpl2.java      |  24 +-
 .../tez/dag/app/rm/TestTaskSchedulerManager.java   |  10 +-
 6 files changed, 373 insertions(+), 256 deletions(-)

diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index e8755832f..9ee1b3d94 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -248,4 +248,29 @@
     <Bug pattern="EI_EXPOSE_REP2"/>
   </Match>
 
+  <!-- TEZ-4647 PluginManager EI_EXPOSE_REP warnings -->
+  <Match>
+    <Class name="org.apache.tez.dag.app.PluginManager"/>
+    <Method name="getTaskSchedulers"/>
+    <Bug pattern="EI_EXPOSE_REP"/>
+  </Match>
+
+  <Match>
+    <Class name="org.apache.tez.dag.app.PluginManager"/>
+    <Method name="getContainerLaunchers"/>
+    <Bug pattern="EI_EXPOSE_REP"/>
+  </Match>
+
+  <Match>
+    <Class name="org.apache.tez.dag.app.PluginManager"/>
+    <Method name="getTaskCommunicators"/>
+    <Bug pattern="EI_EXPOSE_REP"/>
+  </Match>
+
+  <Match>
+    <Class name="org.apache.tez.dag.app.PluginManager"/>
+    <Method name="&lt;init&gt;"/>
+    <Bug pattern="EI_EXPOSE_REP2"/>
+  </Match>
+
 </FindBugsFilter>
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 99579c7ff..ec4a89be0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -132,7 +132,6 @@ import 
org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
-import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData;
 import org.apache.tez.dag.app.dag.DAG;
@@ -195,9 +194,6 @@ import org.apache.tez.util.TezMxBeanResourceCalculator;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -254,7 +250,6 @@ public class DAGAppMaster extends AbstractService {
   private final String workingDirectory;
   private final String[] localDirs;
   private final String[] logDirs;
-  private final AMPluginDescriptorProto amPluginDescriptorProto;
   private HadoopShim hadoopShim;
   private ContainerSignatureMatcher containerSignatureMatcher;
   private AMContainerMap containers;
@@ -312,11 +307,8 @@ public class DAGAppMaster extends AbstractService {
   private FileSystem recoveryFS;
 
   private ListeningExecutorService execService;
+  private final PluginManager pluginManager;
 
-  // TODO May not need to be a bidi map
-  private final BiMap<String, Integer> taskSchedulers = HashBiMap.create();
-  private final BiMap<String, Integer> containerLaunchers = HashBiMap.create();
-  private final BiMap<String, Integer> taskCommunicators = HashBiMap.create();
 
   /**
    * set of already executed dag names.
@@ -376,7 +368,6 @@ public class DAGAppMaster extends AbstractService {
     this.dagVersionInfo = new TezDagVersionInfo();
     this.clientVersion = clientVersion;
     this.amCredentials = credentials;
-    this.amPluginDescriptorProto = pluginDescriptorProto;
     this.appMasterUgi = UserGroupInformation
         .createRemoteUser(jobUserName);
     this.appMasterUgi.addCredentials(amCredentials);
@@ -387,6 +378,8 @@ public class DAGAppMaster extends AbstractService {
     LOG.info("Created DAGAppMaster for application " + applicationAttemptId
         + ", versionInfo=" + dagVersionInfo);
     TezCommonUtils.logCredentials(LOG, this.appMasterUgi.getCredentials(), 
"am");
+
+    this.pluginManager = new PluginManager(pluginDescriptorProto);
   }
 
   // Pull this WebAppUtils function into Tez until YARN-4186
@@ -451,18 +444,10 @@ public class DAGAppMaster extends AbstractService {
 
     UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(amConf);
 
-    List<NamedEntityDescriptor> taskSchedulerDescriptors = 
Lists.newLinkedList();
-    List<NamedEntityDescriptor> containerLauncherDescriptors = 
Lists.newLinkedList();
-    List<NamedEntityDescriptor> taskCommunicatorDescriptors = 
Lists.newLinkedList();
-
-    parseAllPlugins(taskSchedulerDescriptors, taskSchedulers, 
containerLauncherDescriptors,
-        containerLaunchers, taskCommunicatorDescriptors, taskCommunicators, 
amPluginDescriptorProto,
-        isLocal, defaultPayload);
-
-
-    LOG.info(buildPluginComponentLog(taskSchedulerDescriptors, taskSchedulers, 
"TaskSchedulers"));
-    LOG.info(buildPluginComponentLog(containerLauncherDescriptors, 
containerLaunchers, "ContainerLaunchers"));
-    LOG.info(buildPluginComponentLog(taskCommunicatorDescriptors, 
taskCommunicators, "TaskCommunicators"));
+    PluginManager.PluginDescriptors pluginDescriptors = 
pluginManager.parseAllPlugins(isLocal, defaultPayload);
+    List<NamedEntityDescriptor> taskSchedulerDescriptors = 
pluginDescriptors.getTaskSchedulerDescriptors();
+    List<NamedEntityDescriptor> containerLauncherDescriptors = 
pluginDescriptors.getContainerLauncherDescriptors();
+    List<NamedEntityDescriptor> taskCommunicatorDescriptors = 
pluginDescriptors.getTaskCommunicatorDescriptors();
 
     boolean disableVersionCheck = conf.getBoolean(
         TezConfiguration.TEZ_AM_DISABLE_CLIENT_VERSION_CHECK,
@@ -1672,32 +1657,32 @@ public class DAGAppMaster extends AbstractService {
 
     @Override
     public Integer getTaskCommunicatorIdentifier(String name) {
-      return taskCommunicators.get(name);
+      return pluginManager.getTaskCommunicators().get(name);
     }
 
     @Override
     public Integer getTaskScheduerIdentifier(String name) {
-      return taskSchedulers.get(name);
+      return pluginManager.getTaskSchedulers().get(name);
     }
 
     @Override
     public Integer getContainerLauncherIdentifier(String name) {
-      return containerLaunchers.get(name);
+      return pluginManager.getContainerLaunchers().get(name);
     }
 
     @Override
     public String getTaskCommunicatorName(int taskCommId) {
-      return taskCommunicators.inverse().get(taskCommId);
+      return pluginManager.getTaskCommunicators().inverse().get(taskCommId);
     }
 
     @Override
     public String getTaskSchedulerName(int schedulerId) {
-      return taskSchedulers.inverse().get(schedulerId);
+      return pluginManager.getTaskSchedulers().inverse().get(schedulerId);
     }
 
     @Override
     public String getContainerLauncherName(int launcherId) {
-      return containerLaunchers.inverse().get(launcherId);
+      return pluginManager.getContainerLaunchers().inverse().get(launcherId);
     }
 
     @Override
@@ -2732,128 +2717,6 @@ public class DAGAppMaster extends AbstractService {
     return webUIService == null ? null : webUIService.getBaseUrl();
   }
 
-  @VisibleForTesting
-  public static void parseAllPlugins(
-      List<NamedEntityDescriptor> taskSchedulerDescriptors, BiMap<String, 
Integer> taskSchedulerPluginMap,
-      List<NamedEntityDescriptor> containerLauncherDescriptors, BiMap<String, 
Integer> containerLauncherPluginMap,
-      List<NamedEntityDescriptor> taskCommDescriptors, BiMap<String, Integer> 
taskCommPluginMap,
-      AMPluginDescriptorProto amPluginDescriptorProto, boolean isLocal, 
UserPayload defaultPayload) {
-
-    boolean tezYarnEnabled;
-    boolean uberEnabled;
-    if (!isLocal) {
-      if (amPluginDescriptorProto == null) {
-        tezYarnEnabled = true;
-        uberEnabled = false;
-      } else {
-        tezYarnEnabled = amPluginDescriptorProto.getContainersEnabled();
-        uberEnabled = amPluginDescriptorProto.getUberEnabled();
-      }
-    } else {
-      tezYarnEnabled = false;
-      uberEnabled = true;
-    }
-
-    parsePlugin(taskSchedulerDescriptors, taskSchedulerPluginMap,
-        (amPluginDescriptorProto == null || 
amPluginDescriptorProto.getTaskSchedulersCount() == 0 ?
-            null :
-            amPluginDescriptorProto.getTaskSchedulersList()),
-        tezYarnEnabled, uberEnabled, defaultPayload);
-    processSchedulerDescriptors(taskSchedulerDescriptors, isLocal, 
defaultPayload, taskSchedulerPluginMap);
-
-    parsePlugin(containerLauncherDescriptors, containerLauncherPluginMap,
-        (amPluginDescriptorProto == null ||
-            amPluginDescriptorProto.getContainerLaunchersCount() == 0 ? null :
-            amPluginDescriptorProto.getContainerLaunchersList()),
-        tezYarnEnabled, uberEnabled, defaultPayload);
-
-    parsePlugin(taskCommDescriptors, taskCommPluginMap,
-        (amPluginDescriptorProto == null ||
-            amPluginDescriptorProto.getTaskCommunicatorsCount() == 0 ? null :
-            amPluginDescriptorProto.getTaskCommunicatorsList()),
-        tezYarnEnabled, uberEnabled, defaultPayload);
-  }
-
-
-  @VisibleForTesting
-  public static void parsePlugin(List<NamedEntityDescriptor> resultList,
-      BiMap<String, Integer> pluginMap, List<TezNamedEntityDescriptorProto> 
namedEntityDescriptorProtos,
-      boolean tezYarnEnabled, boolean uberEnabled, UserPayload defaultPayload) 
{
-
-    if (tezYarnEnabled) {
-      // Default classnames will be populated by individual components
-      NamedEntityDescriptor r = new NamedEntityDescriptor(
-          TezConstants.getTezYarnServicePluginName(), 
null).setUserPayload(defaultPayload);
-      addDescriptor(resultList, pluginMap, r);
-    }
-
-    if (uberEnabled) {
-      // Default classnames will be populated by individual components
-      NamedEntityDescriptor r = new NamedEntityDescriptor(
-          TezConstants.getTezUberServicePluginName(), 
null).setUserPayload(defaultPayload);
-      addDescriptor(resultList, pluginMap, r);
-    }
-
-    if (namedEntityDescriptorProtos != null) {
-      for (TezNamedEntityDescriptorProto namedEntityDescriptorProto : 
namedEntityDescriptorProtos) {
-        NamedEntityDescriptor namedEntityDescriptor = DagTypeConverters
-            .convertNamedDescriptorFromProto(namedEntityDescriptorProto);
-        addDescriptor(resultList, pluginMap, namedEntityDescriptor);
-      }
-    }
-  }
-
-  @VisibleForTesting
-  static void addDescriptor(List<NamedEntityDescriptor> list, BiMap<String, 
Integer> pluginMap,
-                            NamedEntityDescriptor namedEntityDescriptor) {
-    list.add(namedEntityDescriptor);
-    pluginMap.put(list.get(list.size() - 1).getEntityName(), list.size() - 1);
-  }
-
-  @VisibleForTesting
-  static void processSchedulerDescriptors(List<NamedEntityDescriptor> 
descriptors, boolean isLocal,
-                                          UserPayload defaultPayload,
-                                          BiMap<String, Integer> 
schedulerPluginMap) {
-    if (isLocal) {
-      boolean foundUberServiceName = false;
-      for (NamedEntityDescriptor descriptor : descriptors) {
-        if 
(descriptor.getEntityName().equals(TezConstants.getTezUberServicePluginName())) 
{
-          foundUberServiceName = true;
-          break;
-        }
-      }
-      Preconditions.checkState(foundUberServiceName);
-    } else {
-      boolean foundYarn = false;
-      for (int i = 0; i < descriptors.size(); i++) {
-        if 
(descriptors.get(i).getEntityName().equals(TezConstants.getTezYarnServicePluginName()))
 {
-          foundYarn = true;
-          break;
-        }
-      }
-      if (!foundYarn) {
-        NamedEntityDescriptor yarnDescriptor =
-            new 
NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
-                .setUserPayload(defaultPayload);
-        addDescriptor(descriptors, schedulerPluginMap, yarnDescriptor);
-      }
-    }
-  }
-
-  String buildPluginComponentLog(List<NamedEntityDescriptor> 
namedEntityDescriptors, BiMap<String, Integer> map,
-                                 String component) {
-    StringBuilder sb = new StringBuilder();
-    sb.append("AM Level configured ").append(component).append(": ");
-    for (int i = 0; i < namedEntityDescriptors.size(); i++) {
-      sb.append("[").append(i).append(":").append(map.inverse().get(i))
-          
.append(":").append(namedEntityDescriptors.get(i).getClassName()).append("]");
-      if (i != namedEntityDescriptors.size() - 1) {
-        sb.append(",");
-      }
-    }
-    return sb.toString();
-  }
-
   public void vertexComplete(TezVertexID completedVertexID, Set<NodeId> 
nodesList) {
     getContainerLauncherManager().vertexComplete(completedVertexID, 
jobTokenSecretManager, nodesList);
   }
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/PluginManager.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/PluginManager.java
new file mode 100644
index 000000000..24f607758
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/PluginManager.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.app;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Lists;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manager for AM service plugins.
+ *
+ * This component parses the configured plugins for TaskSchedulers,
+ * ContainerLaunchers, and TaskCommunicators from the AM configuration,
+ * maintains their name-to-identifier mappings, and provides the parsed
+ * descriptor lists used to initialize the corresponding managers.
+ */
+public class PluginManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PluginManager.class);
+
+  private final AMPluginDescriptorProto amPluginDescriptorProto;
+
+  // Plugin maps for task schedulers, container launchers, and task 
communicators
+  private final BiMap<String, Integer> taskSchedulers = HashBiMap.create();
+  private final BiMap<String, Integer> containerLaunchers = HashBiMap.create();
+  private final BiMap<String, Integer> taskCommunicators = HashBiMap.create();
+
+  /**
+   * Wrapper for parsed plugin descriptors.
+   *
+   * The descriptor lists exposed by this class are unmodifiable snapshots
+   * created at parse time. Callers must not attempt to modify these
+   * collections; any modification attempts will throw an exception.
+   */
+  public static final class PluginDescriptors {
+    private final List<NamedEntityDescriptor> taskSchedulerDescriptors;
+    private final List<NamedEntityDescriptor> containerLauncherDescriptors;
+    private final List<NamedEntityDescriptor> taskCommunicatorDescriptors;
+
+    public PluginDescriptors(List<NamedEntityDescriptor> 
taskSchedulerDescriptors,
+                             List<NamedEntityDescriptor> 
containerLauncherDescriptors,
+                             List<NamedEntityDescriptor> 
taskCommunicatorDescriptors) {
+      this.taskSchedulerDescriptors = 
Collections.unmodifiableList(taskSchedulerDescriptors);
+      this.containerLauncherDescriptors = 
Collections.unmodifiableList(containerLauncherDescriptors);
+      this.taskCommunicatorDescriptors = 
Collections.unmodifiableList(taskCommunicatorDescriptors);
+    }
+
+    public List<NamedEntityDescriptor> getTaskSchedulerDescriptors() {
+      return taskSchedulerDescriptors;
+    }
+
+    public List<NamedEntityDescriptor> getContainerLauncherDescriptors() {
+      return containerLauncherDescriptors;
+    }
+
+    public List<NamedEntityDescriptor> getTaskCommunicatorDescriptors() {
+      return taskCommunicatorDescriptors;
+    }
+  }
+
+  public PluginManager() {
+    this(null);
+  }
+
+  public PluginManager(AMPluginDescriptorProto amPluginDescriptorProto) {
+    this.amPluginDescriptorProto = amPluginDescriptorProto;
+  }
+
+  /**
+   * Parse all plugins for task schedulers, container launchers, and task 
communicators.
+   */
+  public PluginDescriptors parseAllPlugins(boolean isLocal, UserPayload 
defaultPayload) {
+
+    List<NamedEntityDescriptor> taskSchedulerDescriptors = 
Lists.newLinkedList();
+    List<NamedEntityDescriptor> containerLauncherDescriptors = 
Lists.newLinkedList();
+    List<NamedEntityDescriptor> taskCommDescriptors = Lists.newLinkedList();
+
+    boolean tezYarnEnabled;
+    boolean uberEnabled;
+    if (!isLocal) {
+      if (amPluginDescriptorProto == null) {
+        tezYarnEnabled = true;
+        uberEnabled = false;
+      } else {
+        tezYarnEnabled = amPluginDescriptorProto.getContainersEnabled();
+        uberEnabled = amPluginDescriptorProto.getUberEnabled();
+      }
+    } else {
+      tezYarnEnabled = false;
+      uberEnabled = true;
+    }
+
+    // parse task scheduler plugins
+    parsePlugin(taskSchedulerDescriptors, taskSchedulers,
+        (amPluginDescriptorProto == null || 
amPluginDescriptorProto.getTaskSchedulersCount() == 0 ?
+            null :
+            amPluginDescriptorProto.getTaskSchedulersList()),
+        tezYarnEnabled, uberEnabled, defaultPayload);
+
+    // post-process task scheduler plugin descriptors
+    processSchedulerDescriptors(taskSchedulerDescriptors, isLocal, 
defaultPayload, taskSchedulers);
+
+    // parse container launcher plugins
+    parsePlugin(containerLauncherDescriptors, containerLaunchers,
+        (amPluginDescriptorProto == null ||
+            amPluginDescriptorProto.getContainerLaunchersCount() == 0 ? null :
+            amPluginDescriptorProto.getContainerLaunchersList()),
+        tezYarnEnabled, uberEnabled, defaultPayload);
+
+    // parse task communicator plugins
+    parsePlugin(taskCommDescriptors, taskCommunicators,
+        (amPluginDescriptorProto == null ||
+            amPluginDescriptorProto.getTaskCommunicatorsCount() == 0 ? null :
+            amPluginDescriptorProto.getTaskCommunicatorsList()),
+        tezYarnEnabled, uberEnabled, defaultPayload);
+
+    // Log plugin component information
+    LOG.info(buildPluginComponentLog(taskSchedulerDescriptors, taskSchedulers, 
"TaskSchedulers"));
+    LOG.info(buildPluginComponentLog(containerLauncherDescriptors, 
containerLaunchers, "ContainerLaunchers"));
+    LOG.info(buildPluginComponentLog(taskCommDescriptors, taskCommunicators, 
"TaskCommunicators"));
+
+    return new PluginDescriptors(taskSchedulerDescriptors, 
containerLauncherDescriptors, taskCommDescriptors);
+  }
+
+  /**
+   * Parse a specific plugin type.
+   */
+  @VisibleForTesting
+  public static void parsePlugin(List<NamedEntityDescriptor> resultList,
+      BiMap<String, Integer> pluginMap, List<TezNamedEntityDescriptorProto> 
namedEntityDescriptorProtos,
+      boolean tezYarnEnabled, boolean uberEnabled, UserPayload defaultPayload) 
{
+
+    if (tezYarnEnabled) {
+      // Default classnames will be populated by individual components
+      NamedEntityDescriptor descriptor = new NamedEntityDescriptor(
+          TezConstants.getTezYarnServicePluginName(), 
null).setUserPayload(defaultPayload);
+      addDescriptor(resultList, pluginMap, descriptor);
+    }
+
+    if (uberEnabled) {
+      // Default classnames will be populated by individual components
+      NamedEntityDescriptor descriptor = new NamedEntityDescriptor(
+          TezConstants.getTezUberServicePluginName(), 
null).setUserPayload(defaultPayload);
+      addDescriptor(resultList, pluginMap, descriptor);
+    }
+
+    if (namedEntityDescriptorProtos != null) {
+      for (TezNamedEntityDescriptorProto namedEntityDescriptorProto : 
namedEntityDescriptorProtos) {
+        NamedEntityDescriptor descriptor = DagTypeConverters
+            .convertNamedDescriptorFromProto(namedEntityDescriptorProto);
+        addDescriptor(resultList, pluginMap, descriptor);
+      }
+    }
+  }
+
+  /**
+   * Add a descriptor to the list and map.
+   */
+  public static void addDescriptor(List<NamedEntityDescriptor> list, 
BiMap<String, Integer> pluginMap,
+                            NamedEntityDescriptor namedEntityDescriptor) {
+    list.add(namedEntityDescriptor);
+    pluginMap.put(list.getLast().getEntityName(), list.size() - 1);
+  }
+
+  /**
+   * Process scheduler descriptors with framework-specific logic.
+   */
+  public void processSchedulerDescriptors(List<NamedEntityDescriptor> 
descriptors, boolean isLocal,
+                                          UserPayload defaultPayload,
+                                          BiMap<String, Integer> 
schedulerPluginMap) {
+    if (isLocal) {
+      boolean foundUberServiceName = false;
+      for (NamedEntityDescriptor<?> descriptor : descriptors) {
+        if 
(descriptor.getEntityName().equals(TezConstants.getTezUberServicePluginName())) 
{
+          foundUberServiceName = true;
+          break;
+        }
+      }
+      Preconditions.checkState(foundUberServiceName);
+    } else {
+      boolean foundYarn = false;
+      for (NamedEntityDescriptor descriptor : descriptors) {
+        if 
(descriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName())) 
{
+          foundYarn = true;
+          break;
+        }
+      }
+      if (!foundYarn) {
+        NamedEntityDescriptor<?> yarnDescriptor =
+            new 
NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
+                .setUserPayload(defaultPayload);
+        addDescriptor(descriptors, schedulerPluginMap, yarnDescriptor);
+      }
+    }
+  }
+
+  /**
+   * Get the task schedulers map.
+   */
+  public BiMap<String, Integer> getTaskSchedulers() {
+    return taskSchedulers;
+  }
+
+  /**
+   * Get the container launchers map.
+   */
+  public BiMap<String, Integer> getContainerLaunchers() {
+    return containerLaunchers;
+  }
+
+  /**
+   * Get the task communicators map.
+   */
+  public BiMap<String, Integer> getTaskCommunicators() {
+    return taskCommunicators;
+  }
+
+  /**
+   * Build a log message for plugin component information.
+   */
+  private String buildPluginComponentLog(List<NamedEntityDescriptor> 
namedEntityDescriptors, BiMap<String, Integer> map,
+                                       String component) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("AM Level configured ").append(component).append(": ");
+    for (int i = 0; i < namedEntityDescriptors.size(); i++) {
+      sb.append("[").append(i).append(":").append(map.inverse().get(i))
+          
.append(":").append(namedEntityDescriptors.get(i).getClassName()).append("]");
+      if (i != namedEntityDescriptors.size() - 1) {
+        sb.append(",");
+      }
+    }
+    return sb.toString();
+  }
+}
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
index 85a8248b9..01a61b64e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
@@ -87,7 +87,6 @@ import org.apache.tez.dag.records.TezVertexID;
 
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
-import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 
 import org.junit.After;
@@ -158,7 +157,7 @@ public class TestDAGAppMaster {
     // Test empty descriptor list, yarn enabled
     pluginMap.clear();
     entities = new LinkedList<>();
-    DAGAppMaster.parsePlugin(entities, pluginMap, null, true, false, 
defaultPayload);
+    PluginManager.parsePlugin(entities, pluginMap, null, true, false, 
defaultPayload);
     assertEquals(1, pluginMap.size());
     assertEquals(1, entities.size());
     
assertTrue(pluginMap.containsKey(TezConstants.getTezYarnServicePluginName()));
@@ -169,7 +168,7 @@ public class TestDAGAppMaster {
     // Test empty descriptor list, uber enabled
     pluginMap.clear();
     entities = new LinkedList<>();
-    DAGAppMaster.parsePlugin(entities, pluginMap, null, false, true, 
defaultPayload);
+    PluginManager.parsePlugin(entities, pluginMap, null, false, true, 
defaultPayload);
     assertEquals(1, pluginMap.size());
     assertEquals(1, entities.size());
     
assertTrue(pluginMap.containsKey(TezConstants.getTezUberServicePluginName()));
@@ -180,7 +179,7 @@ public class TestDAGAppMaster {
     // Test empty descriptor list, yarn enabled, uber enabled
     pluginMap.clear();
     entities = new LinkedList<>();
-    DAGAppMaster.parsePlugin(entities, pluginMap, null, true, true, 
defaultPayload);
+    PluginManager.parsePlugin(entities, pluginMap, null, true, true, 
defaultPayload);
     assertEquals(2, pluginMap.size());
     assertEquals(2, entities.size());
     
assertTrue(pluginMap.containsKey(TezConstants.getTezYarnServicePluginName()));
@@ -203,7 +202,7 @@ public class TestDAGAppMaster {
     // Test descriptor, no yarn, no uber
     pluginMap.clear();
     entities = new LinkedList<>();
-    DAGAppMaster.parsePlugin(entities, pluginMap, entityDescriptors, false, 
false, defaultPayload);
+    PluginManager.parsePlugin(entities, pluginMap, entityDescriptors, false, 
false, defaultPayload);
     assertEquals(1, pluginMap.size());
     assertEquals(1, entities.size());
     assertTrue(pluginMap.containsKey(pluginName));
@@ -212,7 +211,7 @@ public class TestDAGAppMaster {
     // Test descriptor, yarn and uber
     pluginMap.clear();
     entities = new LinkedList<>();
-    DAGAppMaster.parsePlugin(entities, pluginMap, entityDescriptors, true, 
true, defaultPayload);
+    PluginManager.parsePlugin(entities, pluginMap, entityDescriptors, true, 
true, defaultPayload);
     assertEquals(3, pluginMap.size());
     assertEquals(3, entities.size());
     
assertTrue(pluginMap.containsKey(TezConstants.getTezYarnServicePluginName()));
@@ -227,43 +226,37 @@ public class TestDAGAppMaster {
 
   @Test(timeout = 5000)
   public void testParseAllPluginsNoneSpecified() throws IOException {
+    PluginManager pluginManager = new PluginManager();
     Configuration conf = new Configuration(false);
     conf.set(TEST_KEY, TEST_VAL);
     UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(conf);
 
-    List<NamedEntityDescriptor> tsDescriptors;
-    BiMap<String, Integer> tsMap;
-    List<NamedEntityDescriptor> clDescriptors;
-    BiMap<String, Integer> clMap;
-    List<NamedEntityDescriptor> tcDescriptors;
-    BiMap<String, Integer> tcMap;
-
-
     // No plugins. Non local
-    tsDescriptors = Lists.newLinkedList();
-    tsMap = HashBiMap.create();
-    clDescriptors = Lists.newLinkedList();
-    clMap = HashBiMap.create();
-    tcDescriptors = Lists.newLinkedList();
-    tcMap = HashBiMap.create();
-    DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, clDescriptors, clMap, 
tcDescriptors, tcMap,
-        null, false, defaultPayload);
-    verifyDescAndMap(tsDescriptors, tsMap, 1, true, 
TezConstants.getTezYarnServicePluginName());
-    verifyDescAndMap(clDescriptors, clMap, 1, true, 
TezConstants.getTezYarnServicePluginName());
-    verifyDescAndMap(tcDescriptors, tcMap, 1, true, 
TezConstants.getTezYarnServicePluginName());
+    PluginManager.PluginDescriptors pluginDescriptorsNonLocal = 
pluginManager.parseAllPlugins(false, defaultPayload);
+
+    verifyDescAndMap(pluginDescriptorsNonLocal.getTaskSchedulerDescriptors(),
+        pluginManager.getTaskSchedulers(), 1, true,
+        TezConstants.getTezYarnServicePluginName());
+    
verifyDescAndMap(pluginDescriptorsNonLocal.getContainerLauncherDescriptors(),
+        pluginManager.getContainerLaunchers(), 1, true,
+        TezConstants.getTezYarnServicePluginName());
+    
verifyDescAndMap(pluginDescriptorsNonLocal.getTaskCommunicatorDescriptors(),
+        pluginManager.getTaskCommunicators(), 1, true,
+        TezConstants.getTezYarnServicePluginName());
 
     // No plugins. Local
-    tsDescriptors = Lists.newLinkedList();
-    tsMap = HashBiMap.create();
-    clDescriptors = Lists.newLinkedList();
-    clMap = HashBiMap.create();
-    tcDescriptors = Lists.newLinkedList();
-    tcMap = HashBiMap.create();
-    DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, clDescriptors, clMap, 
tcDescriptors, tcMap,
-        null, true, defaultPayload);
-    verifyDescAndMap(tsDescriptors, tsMap, 1, true, 
TezConstants.getTezUberServicePluginName());
-    verifyDescAndMap(clDescriptors, clMap, 1, true, 
TezConstants.getTezUberServicePluginName());
-    verifyDescAndMap(tcDescriptors, tcMap, 1, true, 
TezConstants.getTezUberServicePluginName());
+    pluginManager = new PluginManager();
+
+    PluginManager.PluginDescriptors pluginDescriptorsLocal = 
pluginManager.parseAllPlugins(true, defaultPayload);
+    verifyDescAndMap(pluginDescriptorsLocal.getTaskSchedulerDescriptors(),
+        pluginManager.getTaskSchedulers(), 1, true,
+        TezConstants.getTezUberServicePluginName());
+    verifyDescAndMap(pluginDescriptorsLocal.getContainerLauncherDescriptors(),
+        pluginManager.getContainerLaunchers(), 1, true,
+        TezConstants.getTezUberServicePluginName());
+    verifyDescAndMap(pluginDescriptorsLocal.getTaskCommunicatorDescriptors(),
+        pluginManager.getTaskCommunicators(), 1, true,
+        TezConstants.getTezUberServicePluginName());
   }
 
   @Test(timeout = 5000)
@@ -276,30 +269,19 @@ public class TestDAGAppMaster {
 
     AMPluginDescriptorProto proto = createAmPluginDescriptor(false, false, 
true, payloadProto);
 
-    List<NamedEntityDescriptor> tsDescriptors;
-    BiMap<String, Integer> tsMap;
-    List<NamedEntityDescriptor> clDescriptors;
-    BiMap<String, Integer> clMap;
-    List<NamedEntityDescriptor> tcDescriptors;
-    BiMap<String, Integer> tcMap;
-
-
     // Only plugin, Yarn.
-    tsDescriptors = Lists.newLinkedList();
-    tsMap = HashBiMap.create();
-    clDescriptors = Lists.newLinkedList();
-    clMap = HashBiMap.create();
-    tcDescriptors = Lists.newLinkedList();
-    tcMap = HashBiMap.create();
-    DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, clDescriptors, clMap, 
tcDescriptors, tcMap,
-        proto, false, defaultPayload);
-    verifyDescAndMap(tsDescriptors, tsMap, 2, true, TS_NAME,
+    PluginManager pluginManager = new PluginManager(proto);
+    PluginManager.PluginDescriptors pluginDescriptors = 
pluginManager.parseAllPlugins(false, defaultPayload);
+    verifyDescAndMap(pluginDescriptors.getTaskSchedulerDescriptors(),
+        pluginManager.getTaskSchedulers(), 2, true, TS_NAME,
         TezConstants.getTezYarnServicePluginName());
-    verifyDescAndMap(clDescriptors, clMap, 1, true, CL_NAME);
-    verifyDescAndMap(tcDescriptors, tcMap, 1, true, TC_NAME);
-    assertEquals(TS_NAME + CLASS_SUFFIX, tsDescriptors.get(0).getClassName());
-    assertEquals(CL_NAME + CLASS_SUFFIX, clDescriptors.get(0).getClassName());
-    assertEquals(TC_NAME + CLASS_SUFFIX, tcDescriptors.get(0).getClassName());
+    verifyDescAndMap(pluginDescriptors.getContainerLauncherDescriptors(),
+        pluginManager.getContainerLaunchers(), 1, true, CL_NAME);
+    verifyDescAndMap(pluginDescriptors.getTaskCommunicatorDescriptors(),
+        pluginManager.getTaskCommunicators(), 1, true, TC_NAME);
+    assertEquals(TS_NAME + CLASS_SUFFIX, 
pluginDescriptors.getTaskSchedulerDescriptors().get(0).getClassName());
+    assertEquals(CL_NAME + CLASS_SUFFIX, 
pluginDescriptors.getContainerLauncherDescriptors().get(0).getClassName());
+    assertEquals(TC_NAME + CLASS_SUFFIX, 
pluginDescriptors.getTaskCommunicatorDescriptors().get(0).getClassName());
   }
 
   @Test(timeout = 5000)
@@ -312,35 +294,24 @@ public class TestDAGAppMaster {
 
     AMPluginDescriptorProto proto = createAmPluginDescriptor(true, false, 
true, payloadProto);
 
-    List<NamedEntityDescriptor> tsDescriptors;
-    BiMap<String, Integer> tsMap;
-    List<NamedEntityDescriptor> clDescriptors;
-    BiMap<String, Integer> clMap;
-    List<NamedEntityDescriptor> tcDescriptors;
-    BiMap<String, Integer> tcMap;
-
-
     // Only plugin, Yarn.
-    tsDescriptors = Lists.newLinkedList();
-    tsMap = HashBiMap.create();
-    clDescriptors = Lists.newLinkedList();
-    clMap = HashBiMap.create();
-    tcDescriptors = Lists.newLinkedList();
-    tcMap = HashBiMap.create();
-    DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, clDescriptors, clMap, 
tcDescriptors, tcMap,
-        proto, false, defaultPayload);
-    verifyDescAndMap(tsDescriptors, tsMap, 2, true, 
TezConstants.getTezYarnServicePluginName(),
-        TS_NAME);
-    verifyDescAndMap(clDescriptors, clMap, 2, true, 
TezConstants.getTezYarnServicePluginName(),
-        CL_NAME);
-    verifyDescAndMap(tcDescriptors, tcMap, 2, true, 
TezConstants.getTezYarnServicePluginName(),
-        TC_NAME);
-    assertNull(tsDescriptors.get(0).getClassName());
-    assertNull(clDescriptors.get(0).getClassName());
-    assertNull(tcDescriptors.get(0).getClassName());
-    assertEquals(TS_NAME + CLASS_SUFFIX, tsDescriptors.get(1).getClassName());
-    assertEquals(CL_NAME + CLASS_SUFFIX, clDescriptors.get(1).getClassName());
-    assertEquals(TC_NAME + CLASS_SUFFIX, tcDescriptors.get(1).getClassName());
+    PluginManager pluginManager = new PluginManager(proto);
+    PluginManager.PluginDescriptors pluginDescriptors = 
pluginManager.parseAllPlugins(false, defaultPayload);
+
+    verifyDescAndMap(pluginDescriptors.getTaskSchedulerDescriptors(), 
pluginManager.getTaskSchedulers(),
+        2, true, TezConstants.getTezYarnServicePluginName(), TS_NAME);
+    verifyDescAndMap(pluginDescriptors.getContainerLauncherDescriptors(), 
pluginManager.getContainerLaunchers(),
+        2, true, TezConstants.getTezYarnServicePluginName(), CL_NAME);
+    verifyDescAndMap(pluginDescriptors.getTaskCommunicatorDescriptors(), 
pluginManager.getTaskCommunicators(),
+        2, true, TezConstants.getTezYarnServicePluginName(), TC_NAME);
+
+    
assertNull(pluginDescriptors.getTaskSchedulerDescriptors().get(0).getClassName());
+    
assertNull(pluginDescriptors.getContainerLauncherDescriptors().get(0).getClassName());
+    
assertNull(pluginDescriptors.getTaskCommunicatorDescriptors().get(0).getClassName());
+
+    assertEquals(TS_NAME + CLASS_SUFFIX, 
pluginDescriptors.getTaskSchedulerDescriptors().get(1).getClassName());
+    assertEquals(CL_NAME + CLASS_SUFFIX, 
pluginDescriptors.getContainerLauncherDescriptors().get(1).getClassName());
+    assertEquals(TC_NAME + CLASS_SUFFIX, 
pluginDescriptors.getTaskCommunicatorDescriptors().get(1).getClassName());
   }
 
   @Test(timeout = 60000)
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
index 1f6d1e5e8..c29a47160 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
@@ -52,7 +51,7 @@ import 
org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.DAGAppMaster;
+import org.apache.tez.dag.app.PluginManager;
 import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
@@ -381,13 +380,9 @@ public class TestVertexImpl2 {
         } catch (IOException e) {
           throw new TezUncheckedException(e);
         }
-        DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), 
taskSchedulers, null,
-            true, false, defaultPayload);
-        DAGAppMaster
-            .parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), 
containerLaunchers, null,
-                true, false, defaultPayload);
-        DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), 
taskComms, null,
-            true, false, defaultPayload);
+        PluginManager.parsePlugin(Lists.newLinkedList(), taskSchedulers, null, 
true, false, defaultPayload);
+        PluginManager.parsePlugin(Lists.newLinkedList(), containerLaunchers, 
null, true, false, defaultPayload);
+        PluginManager.parsePlugin(Lists.newLinkedList(), taskComms, null, 
true, false, defaultPayload);
       } else { // Add N plugins, no YARN defaults
         List<TezNamedEntityDescriptorProto> schedulerList = new LinkedList<>();
         List<TezNamedEntityDescriptorProto> launcherList = new LinkedList<>();
@@ -407,13 +402,9 @@ public class TestVertexImpl2 {
                       DAGProtos.TezEntityDescriptorProto.newBuilder()
                           .setClassName(append(TASK_COMM_NAME_BASE, 
i))).build());
         }
-        DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), 
taskSchedulers,
-            schedulerList, false, false, null);
-        DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), 
containerLaunchers,
-            launcherList, false, false, null);
-        DAGAppMaster
-            .parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), 
taskComms, taskCommList,
-                false, false, null);
+        PluginManager.parsePlugin(Lists.newLinkedList(), taskSchedulers, 
schedulerList, false, false, null);
+        PluginManager.parsePlugin(Lists.newLinkedList(), containerLaunchers, 
launcherList, false, false, null);
+        PluginManager.parsePlugin(Lists.newLinkedList(), taskComms, 
taskCommList, false, false, null);
       }
 
       this.appContext = createDefaultMockAppContext();
@@ -557,4 +548,5 @@ public class TestVertexImpl2 {
     doReturn(mockDag).when(appContext).getCurrentDAG();
     return appContext;
   }
+
 }
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
index 3795e3e6b..3b9936d28 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
@@ -80,8 +80,8 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ClusterInfo;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
-import org.apache.tez.dag.app.DAGAppMaster;
 import org.apache.tez.dag.app.DAGAppMasterState;
+import org.apache.tez.dag.app.PluginManager;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
 import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.dag.DAG;
@@ -865,14 +865,16 @@ public class TestTaskSchedulerManager {
     UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(tezConf);
 
     // Parse plugins
-    List<NamedEntityDescriptor> tsDescriptors = Lists.newLinkedList();
+    PluginManager pluginManager = new PluginManager();
+    PluginManager.PluginDescriptors pluginDescriptors = 
pluginManager.parseAllPlugins(false, defaultPayload);
+    List<NamedEntityDescriptor> tsDescriptors = 
pluginDescriptors.getTaskSchedulerDescriptors();
     BiMap<String, Integer> tsMap = HashBiMap.create();
-    DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, Lists.newLinkedList(), 
HashBiMap.create(), Lists.newLinkedList(),
-        HashBiMap.create(), null, false, defaultPayload);
 
     // Only TezYarn found.
     Assert.assertEquals(1, tsDescriptors.size());
     Assert.assertEquals(TezConstants.getTezYarnServicePluginName(), 
tsDescriptors.get(0).getEntityName());
+    Assert.assertEquals(1, pluginManager.getTaskSchedulers().size());
+    
Assert.assertTrue(pluginManager.getTaskSchedulers().containsKey(TezConstants.getTezYarnServicePluginName()));
 
     // Construct eventHandler
     TestTaskSchedulerHelpers.CapturingEventHandler eventHandler = new 
TestTaskSchedulerHelpers.CapturingEventHandler();


Reply via email to