http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index 7c6a6a4..594e6d3 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -17,18 +17,21 @@ package org.apache.tez.dag.app.launcher;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.UnknownHostException;
+import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
 import org.apache.tez.serviceplugins.api.ContainerStopRequest;
-import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerLauncherContextImpl;
@@ -63,35 +66,35 @@ public class ContainerLauncherRouter extends AbstractService
   public ContainerLauncherRouter(Configuration conf, AppContext context,
                                  TaskAttemptListener taskAttemptListener,
                                  String workingDirectory,
-                                 String[] containerLauncherClassIdentifiers,
+                                 List<NamedEntityDescriptor> 
containerLauncherDescriptors,
                                  boolean isPureLocalMode) throws 
UnknownHostException {
     super(ContainerLauncherRouter.class.getName());
 
     this.appContext = context;
-    if (containerLauncherClassIdentifiers == null || 
containerLauncherClassIdentifiers.length == 0) {
+    if (containerLauncherDescriptors == null || 
containerLauncherDescriptors.isEmpty()) {
       if (isPureLocalMode) {
-        containerLauncherClassIdentifiers =
-            new 
String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT};
+        containerLauncherDescriptors = Lists.newArrayList(new 
NamedEntityDescriptor(
+            TezConstants.getTezUberServicePluginName(), null));
       } else {
-        containerLauncherClassIdentifiers =
-            new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+        containerLauncherDescriptors = Lists.newArrayList(new 
NamedEntityDescriptor(
+            TezConstants.getTezYarnServicePluginName(), null));
       }
     }
-    containerLauncherContexts = new 
ContainerLauncherContext[containerLauncherClassIdentifiers.length];
-    containerLaunchers = new 
ContainerLauncher[containerLauncherClassIdentifiers.length];
-    containerLauncherServiceWrappers = new 
ServicePluginLifecycleAbstractService[containerLauncherClassIdentifiers.length];
+    containerLauncherContexts = new 
ContainerLauncherContext[containerLauncherDescriptors.size()];
+    containerLaunchers = new 
ContainerLauncher[containerLauncherDescriptors.size()];
+    containerLauncherServiceWrappers = new 
ServicePluginLifecycleAbstractService[containerLauncherDescriptors.size()];
 
 
-    for (int i = 0; i < containerLauncherClassIdentifiers.length; i++) {
+    for (int i = 0; i < containerLauncherDescriptors.size(); i++) {
       ContainerLauncherContext containerLauncherContext = new 
ContainerLauncherContextImpl(context, taskAttemptListener);
       containerLauncherContexts[i] = containerLauncherContext;
-      containerLaunchers[i] = 
createContainerLauncher(containerLauncherClassIdentifiers[i], context,
+      containerLaunchers[i] = 
createContainerLauncher(containerLauncherDescriptors.get(i), context,
           containerLauncherContext, taskAttemptListener, workingDirectory, 
isPureLocalMode, conf);
       containerLauncherServiceWrappers[i] = new 
ServicePluginLifecycleAbstractService(containerLaunchers[i]);
     }
   }
 
-  private ContainerLauncher createContainerLauncher(String 
containerLauncherClassIdentifier,
+  private ContainerLauncher createContainerLauncher(NamedEntityDescriptor 
containerLauncherDescriptor,
                                                     AppContext context,
                                                     ContainerLauncherContext 
containerLauncherContext,
                                                     TaskAttemptListener 
taskAttemptListener,
@@ -99,11 +102,12 @@ public class ContainerLauncherRouter extends 
AbstractService
                                                     boolean isPureLocalMode,
                                                     Configuration conf) throws
       UnknownHostException {
-    if 
(containerLauncherClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT))
 {
+    if (containerLauncherDescriptor.getEntityName().equals(
+        TezConstants.getTezYarnServicePluginName())) {
       LOG.info("Creating DefaultContainerLauncher");
       return new ContainerLauncherImpl(containerLauncherContext);
-    } else if (containerLauncherClassIdentifier
-        .equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
+    } else if (containerLauncherDescriptor.getEntityName()
+        .equals(TezConstants.getTezUberServicePluginName())) {
       LOG.info("Creating LocalContainerLauncher");
       // TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it 
makes use of
       // extensive internals which are only available at runtime. Will likely 
require
@@ -111,10 +115,10 @@ public class ContainerLauncherRouter extends 
AbstractService
       return
           new LocalContainerLauncher(containerLauncherContext, context, 
taskAttemptListener, workingDirectory, isPureLocalMode);
     } else {
-      LOG.info("Creating container launcher : " + 
containerLauncherClassIdentifier);
+      LOG.info("Creating container launcher {}:{} ", 
containerLauncherDescriptor.getEntityName(), 
containerLauncherDescriptor.getClassName());
       Class<? extends ContainerLauncher> containerLauncherClazz =
           (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz(
-              containerLauncherClassIdentifier);
+              containerLauncherDescriptor.getClassName());
       try {
         Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
             .getConstructor(ContainerLauncherContext.class);

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index b66e5fa..8038e2c 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -34,6 +35,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
 import org.apache.tez.serviceplugins.api.TaskScheduler;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
@@ -55,7 +58,6 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TaskLocationHint.TaskBasedLocationAffinity;
@@ -143,14 +145,14 @@ public class TaskSchedulerEventHandler extends 
AbstractService implements
    * @param eventHandler
    * @param containerSignatureMatcher
    * @param webUI
-   * @param schedulerClasses the list of scheduler classes / codes. Tez 
internal classes are represented as codes.
+   * @param schedulerDescriptors the list of scheduler descriptors. Tez 
internal classes will not have the class names populated.
    *                         An empty list defaults to using the 
YarnTaskScheduler as the only source.
    */
   @SuppressWarnings("rawtypes")
   public TaskSchedulerEventHandler(AppContext appContext,
       DAGClientServer clientService, EventHandler eventHandler, 
       ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI,
-      String [] schedulerClasses, boolean isPureLocalMode) {
+      List<NamedEntityDescriptor> schedulerDescriptors, boolean 
isPureLocalMode) {
     super(TaskSchedulerEventHandler.class.getName());
     this.appContext = appContext;
     this.eventHandler = eventHandler;
@@ -166,31 +168,34 @@ public class TaskSchedulerEventHandler extends 
AbstractService implements
 
     // Override everything for pure local mode
     if (isPureLocalMode) {
-      this.taskSchedulerClasses = new String[] 
{TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT};
+      this.taskSchedulerClasses = new String[] 
{TezConstants.getTezUberServicePluginName()};
       this.yarnTaskSchedulerIndex = -1;
     } else {
-      if (schedulerClasses == null || schedulerClasses.length ==0) {
-        this.taskSchedulerClasses = new String[] 
{TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+      if (schedulerDescriptors == null || schedulerDescriptors.isEmpty()) {
+        this.taskSchedulerClasses = new String[] 
{TezConstants.getTezYarnServicePluginName()};
         this.yarnTaskSchedulerIndex = 0;
       } else {
         // Ensure the YarnScheduler will be setup and note it's index. This 
will be responsible for heartbeats and YARN registration.
         int foundYarnTaskSchedulerIndex = -1;
-        for (int i = 0 ; i < schedulerClasses.length ; i++) {
-          if 
(schedulerClasses[i].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
+
+        List<String> taskSchedulerClassList = new LinkedList<>();
+        for (int i = 0 ; i < schedulerDescriptors.size() ; i++) {
+          if (schedulerDescriptors.get(i).getEntityName().equals(
+              TezConstants.getTezYarnServicePluginName())) {
+            
taskSchedulerClassList.add(schedulerDescriptors.get(i).getEntityName());
             foundYarnTaskSchedulerIndex = i;
-            break;
+          } else if (schedulerDescriptors.get(i).getEntityName().equals(
+              TezConstants.getTezUberServicePluginName())) {
+            
taskSchedulerClassList.add(schedulerDescriptors.get(i).getEntityName());
+          } else {
+            
taskSchedulerClassList.add(schedulerDescriptors.get(i).getClassName());
           }
         }
-        if (foundYarnTaskSchedulerIndex == -1) { // Not found. Add at the end.
-          this.taskSchedulerClasses = new String[schedulerClasses.length+1];
-          foundYarnTaskSchedulerIndex = this.taskSchedulerClasses.length -1;
-          for (int i = 0 ; i < schedulerClasses.length ; i++) { // Copy over 
the rest.
-            this.taskSchedulerClasses[i] = schedulerClasses[i];
-          }
-          this.taskSchedulerClasses[foundYarnTaskSchedulerIndex] = 
TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT;
-        } else {
-          this.taskSchedulerClasses = schedulerClasses;
+        if (foundYarnTaskSchedulerIndex == -1) {
+          taskSchedulerClassList.add(YarnTaskSchedulerService.class.getName());
+          foundYarnTaskSchedulerIndex = taskSchedulerClassList.size() -1;
         }
+        this.taskSchedulerClasses = taskSchedulerClassList.toArray(new 
String[taskSchedulerClassList.size()]);
         this.yarnTaskSchedulerIndex = foundYarnTaskSchedulerIndex;
       }
     }
@@ -419,10 +424,10 @@ public class TaskSchedulerEventHandler extends 
AbstractService implements
         new TaskSchedulerContextImpl(this, appContext, schedulerId, 
trackingUrl,
             customAppIdIdentifier, host, port, getConfig());
     TaskSchedulerContext wrappedContext = new 
TaskSchedulerContextImplWrapper(rawContext, appCallbackExecutor);
-    if 
(schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
+    if (schedulerClassName.equals(TezConstants.getTezYarnServicePluginName())) 
{
       LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
       return new YarnTaskSchedulerService(wrappedContext);
-    } else if 
(schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT))
 {
+    } else if 
(schedulerClassName.equals(TezConstants.getTezUberServicePluginName())) {
       LOG.info("Creating TaskScheduler: Local TaskScheduler");
       return new LocalTaskSchedulerService(wrappedContext);
     } else {
@@ -454,7 +459,7 @@ public class TaskSchedulerEventHandler extends 
AbstractService implements
     for (int i = 0; i < taskSchedulerClasses.length; i++) {
       long customAppIdIdentifier;
       if (isPureLocalMode || taskSchedulerClasses[i].equals(
-          TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { // Use the app 
identifier from the appId.
+          TezConstants.getTezYarnServicePluginName())) { // Use the app 
identifier from the appId.
         customAppIdIdentifier = 
appContext.getApplicationID().getClusterTimestamp();
       } else {
         customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * 
SCHEDULER_APP_ID_INCREMENT);

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 21ae5f7..17feeaa 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -486,7 +487,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
       Credentials credentials, String jobUserName, int handlerConcurrency, int 
numConcurrentContainers) {
     super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, 
clock, appSubmitTime,
         isSession, workingDirectory, localDirs, logDirs,  new 
TezApiVersionInfo().getVersion(), 1,
-        credentials, jobUserName);
+        credentials, jobUserName, null);
     containerLauncherContext = new ContainerLauncherContextImpl(getContext(), 
getTaskAttemptListener());
     containerLauncher = new MockContainerLauncher(launcherGoFlag, 
containerLauncherContext);
     shutdownHandler = new MockDAGAppMasterShutdownHandler();
@@ -500,7 +501,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
   // use mock container launcher for tests
   @Override
   protected ContainerLauncherRouter createContainerLauncherRouter(final 
Configuration conf,
-                                                                  String[] 
containerLaunchers,
+                                                                  
List<NamedEntityDescriptor> containerLauncherDescirptors,
                                                                   boolean 
isLocal)
       throws UnknownHostException {
     return new ContainerLauncherRouter(containerLauncher, getContext());

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
 
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 41a7373..e45b0a2 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -50,6 +50,7 @@ import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
@@ -375,10 +376,10 @@ public class TestTaskAttemptListenerImplTezDag {
     public TaskAttemptListenerImplForTest(AppContext context,
                                           TaskHeartbeatHandler thh,
                                           ContainerHeartbeatHandler chh,
-                                          String[] 
taskCommunicatorClassIdentifiers,
+                                          List<NamedEntityDescriptor> 
taskCommDescriptors,
                                           Configuration conf,
                                           boolean isPureLocalMode) {
-      super(context, thh, chh, taskCommunicatorClassIdentifiers, conf,
+      super(context, thh, chh, taskCommDescriptors, conf,
           isPureLocalMode);
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index 3ea0446..f191175 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -32,6 +32,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.client.DAGClientServer;
@@ -91,7 +93,7 @@ public class TestTaskSchedulerEventHandler {
     public MockTaskSchedulerEventHandler(AppContext appContext,
         DAGClientServer clientService, EventHandler eventHandler,
         ContainerSignatureMatcher containerSignatureMatcher, WebUIService 
webUI) {
-      super(appContext, clientService, eventHandler, 
containerSignatureMatcher, webUI, new String[] {}, false);
+      super(appContext, clientService, eventHandler, 
containerSignatureMatcher, webUI, new LinkedList<NamedEntityDescriptor>(), 
false);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 966c95a..60d37e9 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
 import 
org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
@@ -130,7 +131,7 @@ class TestTaskSchedulerHelpers {
         EventHandler eventHandler,
         TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync,
         ContainerSignatureMatcher containerSignatureMatcher) {
-      super(appContext, null, eventHandler, containerSignatureMatcher, null, 
new String[]{}, false);
+      super(appContext, null, eventHandler, containerSignatureMatcher, null, 
new LinkedList<NamedEntityDescriptor>(), false);
       this.amrmClientAsync = amrmClientAsync;
       this.containerSignatureMatcher = containerSignatureMatcher;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
----------------------------------------------------------------------
diff --git 
a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java 
b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
index ba17ba0..611e8cc 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
@@ -20,8 +20,8 @@ package org.apache.tez.examples;
 
 import java.io.IOException;
 import java.util.Set;
-import java.util.Map;
 
+import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -137,6 +137,9 @@ public class JoinValidate extends TezExampleBase {
   private DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int 
numPartitions)
       throws IOException {
     DAG dag = DAG.create(getDagName());
+    if (getDefaultExecutionContext() != null) {
+      dag.setExecutionContext(getDefaultExecutionContext());
+    }
 
     // Configuration for intermediate output - shared by Vertex1 and Vertex2
     // This should only be setting selective keys from the underlying conf. 
Fix after there's a
@@ -153,18 +156,18 @@ public class JoinValidate extends TezExampleBase {
         MRInput
             .createConfigBuilder(new Configuration(tezConf), 
TextInputFormat.class,
                 
lhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build());
-    setVertexProperties(lhsVertex, getLhsVertexProperties());
+    setVertexExecutionContext(lhsVertex, getLhsExecutionContext());
 
     Vertex rhsVertex = Vertex.create(RHS_INPUT_NAME, 
ProcessorDescriptor.create(
         ForwardingProcessor.class.getName())).addDataSource("rhs",
         MRInput
             .createConfigBuilder(new Configuration(tezConf), 
TextInputFormat.class,
                 
rhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build());
-    setVertexProperties(rhsVertex, getRhsVertexProperties());
+    setVertexExecutionContext(rhsVertex, getRhsExecutionContext());
 
     Vertex joinValidateVertex = Vertex.create("joinvalidate", 
ProcessorDescriptor.create(
         JoinValidateProcessor.class.getName()), numPartitions);
-    setVertexProperties(joinValidateVertex, getValidateVertexProperties());
+    setVertexExecutionContext(joinValidateVertex, 
getValidateExecutionContext());
 
     Edge e1 = Edge.create(lhsVertex, joinValidateVertex, 
edgeConf.createDefaultEdgeProperty());
     Edge e2 = Edge.create(rhsVertex, joinValidateVertex, 
edgeConf.createDefaultEdgeProperty());
@@ -174,23 +177,25 @@ public class JoinValidate extends TezExampleBase {
     return dag;
   }
 
-  private void setVertexProperties(Vertex vertex, Map<String, String> 
properties) {
-    if (properties != null) {
-      for (Map.Entry<String, String> entry : properties.entrySet()) {
-        vertex.setConf(entry.getKey(), entry.getValue());
-      }
+  private void setVertexExecutionContext(Vertex vertex, VertexExecutionContext 
executionContext) {
+    if (executionContext != null) {
+      vertex.setExecutionContext(executionContext);
     }
   }
 
-  protected Map<String, String> getLhsVertexProperties() {
+  protected VertexExecutionContext getDefaultExecutionContext() {
     return null;
   }
 
-  protected Map<String, String> getRhsVertexProperties() {
+  protected VertexExecutionContext getLhsExecutionContext() {
     return null;
   }
 
-  protected Map<String, String> getValidateVertexProperties() {
+  protected VertexExecutionContext getRhsExecutionContext() {
+    return null;
+  }
+
+  protected VertexExecutionContext getValidateExecutionContext() {
     return null;
   }
 
@@ -240,4 +245,6 @@ public class JoinValidate extends TezExampleBase {
       }
     }
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
----------------------------------------------------------------------
diff --git 
a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
 
b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
index 85f9415..0002b42 100644
--- 
a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
+++ 
b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
@@ -121,7 +121,8 @@ public class TezTestServiceContainerLauncher extends 
ContainerLauncher {
   private RunContainerRequestProto 
constructRunContainerRequest(ContainerLaunchRequest launchRequest) throws
       IOException {
     RunContainerRequestProto.Builder builder = 
RunContainerRequestProto.newBuilder();
-    
Preconditions.checkArgument(launchRequest.getTaskCommunicatorName().equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT));
+    Preconditions.checkArgument(launchRequest.getTaskCommunicatorName().equals(
+        TezConstants.getTezYarnServicePluginName()));
     InetSocketAddress address = (InetSocketAddress) 
getContext().getTaskCommunicatorMetaInfo(launchRequest.getTaskCommunicatorName());
     builder.setAmHost(address.getHostName()).setAmPort(address.getPort());
     builder.setAppAttemptNumber(appAttemptId.getAttemptId());

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
----------------------------------------------------------------------
diff --git 
a/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
 
b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
index e5d2e3b..f31476f 100644
--- 
a/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
+++ 
b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
@@ -14,36 +14,46 @@
 
 package org.apache.tez.examples;
 
-import java.util.Map;
+
+import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
 
 public class JoinValidateConfigured extends JoinValidate {
 
-  private final Map<String, String> lhsProps;
-  private final Map<String, String> rhsProps;
-  private final Map<String, String> validateProps;
+  private final VertexExecutionContext defaultExecutionContext;
+  private final VertexExecutionContext lhsContext;
+  private final VertexExecutionContext rhsContext;
+  private final VertexExecutionContext validateContext;
   private final String dagNameSuffix;
 
-  public JoinValidateConfigured(Map<String, String> lhsProps, Map<String, 
String> rhsProps,
-                                Map<String, String> validateProps, String 
dagNameSuffix) {
-    this.lhsProps = lhsProps;
-    this.rhsProps = rhsProps;
-    this.validateProps = validateProps;
+  public JoinValidateConfigured(VertexExecutionContext defaultExecutionContext,
+                                VertexExecutionContext lhsContext,
+                                VertexExecutionContext rhsContext,
+                                VertexExecutionContext validateContext, String 
dagNameSuffix) {
+    this.defaultExecutionContext = defaultExecutionContext;
+    this.lhsContext = lhsContext;
+    this.rhsContext = rhsContext;
+    this.validateContext = validateContext;
     this.dagNameSuffix = dagNameSuffix;
   }
 
   @Override
-  protected Map<String, String> getLhsVertexProperties() {
-    return this.lhsProps;
+  protected VertexExecutionContext getDefaultExecutionContext() {
+    return this.defaultExecutionContext;
+  }
+
+  @Override
+  protected VertexExecutionContext getLhsExecutionContext() {
+    return this.lhsContext;
   }
 
   @Override
-  protected Map<String, String> getRhsVertexProperties() {
-    return this.rhsProps;
+  protected VertexExecutionContext getRhsExecutionContext() {
+    return this.rhsContext;
   }
 
   @Override
-  protected Map<String, String> getValidateVertexProperties() {
-    return this.validateProps;
+  protected VertexExecutionContext getValidateExecutionContext() {
+    return this.validateContext;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git 
a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
 
b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
index 45c70f1..07dd363 100644
--- 
a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
+++ 
b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -19,7 +19,6 @@ import static org.junit.Assert.assertEquals;
 import java.io.IOException;
 import java.util.Map;
 
-import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -28,9 +27,9 @@ import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
@@ -43,6 +42,10 @@ import 
org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
 import org.apache.tez.service.MiniTezTestServiceCluster;
 import org.apache.tez.service.impl.ContainerRunnerImpl;
+import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
+import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
 import org.apache.tez.test.MiniTezCluster;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -72,9 +75,15 @@ public class TestExternalTezServices {
   private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = new 
Path(SRC_DATA_DIR, "expectedOutputPath");
   private static final Path HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, 
"outPath");
 
-  private static final Map<String, String> PROPS_EXT_SERVICE_PUSH = 
Maps.newHashMap();
-  private static final Map<String, String> PROPS_REGULAR_CONTAINERS = 
Maps.newHashMap();
-  private static final Map<String, String> PROPS_IN_AM = Maps.newHashMap();
+  private static final VertexExecutionContext 
EXECUTION_CONTEXT_EXT_SERVICE_PUSH =
+      VertexExecutionContext.create(
+          EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
+  private static final VertexExecutionContext 
EXECUTION_CONTEXT_REGULAR_CONTAINERS =
+      VertexExecutionContext.createExecuteInContainers(true);
+  private static final VertexExecutionContext EXECUTION_CONTEXT_IN_AM =
+      VertexExecutionContext.createExecuteInAm(true);
+
+  private static final VertexExecutionContext EXECUTION_CONTEXT_DEFAULT = 
EXECUTION_CONTEXT_EXT_SERVICE_PUSH;
 
   private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + 
TestExternalTezServices.class.getName()
       + "-tmpDir";
@@ -127,51 +136,28 @@ public class TestExternalTezServices {
     confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, 
stagingDirPath.toString());
     
confForJobs.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
 false);
 
-    confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
-        EXT_PUSH_ENTITY_NAME + ":" + 
TezTestServiceTaskSchedulerService.class.getName());
-
-    confForJobs.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
-        EXT_PUSH_ENTITY_NAME + ":" + 
TezTestServiceNoOpContainerLauncher.class.getName());
-
-    confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
-        EXT_PUSH_ENTITY_NAME + ":" + 
TezTestServiceTaskCommunicatorImpl.class.getName());
-
-    // Default all jobs to run via the service. Individual tests override this 
on a per vertex/dag level.
-    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, 
EXT_PUSH_ENTITY_NAME);
-    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, 
EXT_PUSH_ENTITY_NAME);
-    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, 
EXT_PUSH_ENTITY_NAME);
-
-    // Setup various executor sets
-    
PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
-    
PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
-    
PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
-
-    
PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, 
EXT_PUSH_ENTITY_NAME);
-    
PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME,
 EXT_PUSH_ENTITY_NAME);
-    
PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME,
 EXT_PUSH_ENTITY_NAME);
-
-    PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT);
-    PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT);
-    PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT);
+    TaskSchedulerDescriptor[] taskSchedulerDescriptors = new 
TaskSchedulerDescriptor[]{
+        TaskSchedulerDescriptor
+            .create(EXT_PUSH_ENTITY_NAME, 
TezTestServiceTaskSchedulerService.class.getName())};
+
+    ContainerLauncherDescriptor[] containerLauncherDescriptors = new 
ContainerLauncherDescriptor[]{
+        ContainerLauncherDescriptor
+            .create(EXT_PUSH_ENTITY_NAME, 
TezTestServiceNoOpContainerLauncher.class.getName())};
+
+    TaskCommunicatorDescriptor[] taskCommunicatorDescriptors = new 
TaskCommunicatorDescriptor[]{
+        TaskCommunicatorDescriptor
+            .create(EXT_PUSH_ENTITY_NAME, 
TezTestServiceTaskCommunicatorImpl.class.getName())};
 
+    ServicePluginsDescriptor servicePluginsDescriptor = 
ServicePluginsDescriptor.create(true, true,
+        taskSchedulerDescriptors, containerLauncherDescriptors, 
taskCommunicatorDescriptors);
 
     // Create a session to use for all tests.
     TezConfiguration tezClientConf = new TezConfiguration(confForJobs);
 
-    sharedTezClient = 
TezClient.create(TestExternalTezServices.class.getSimpleName() + "_session",
-        tezClientConf, true);
+    sharedTezClient = TezClient
+        .newBuilder(TestExternalTezServices.class.getSimpleName() + 
"_session", tezClientConf)
+        
.setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
+
     sharedTezClient.start();
     LOG.info("Shared TezSession started");
     sharedTezClient.waitTillReady();
@@ -225,71 +211,71 @@ public class TestExternalTezServices {
   @Test(timeout = 60000)
   public void testAllInService() throws Exception {
     int expectedExternalSubmissions = 4 + 3; //4 for 4 src files, 3 for num 
reducers.
-    runJoinValidate("AllInService", expectedExternalSubmissions, 
PROPS_EXT_SERVICE_PUSH,
-        PROPS_EXT_SERVICE_PUSH, PROPS_EXT_SERVICE_PUSH);
+    runJoinValidate("AllInService", expectedExternalSubmissions, 
EXECUTION_CONTEXT_EXT_SERVICE_PUSH,
+        EXECUTION_CONTEXT_EXT_SERVICE_PUSH, 
EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
   }
 
   @Test(timeout = 60000)
   public void testAllInContainers() throws Exception {
     int expectedExternalSubmissions = 0; // All in containers
-    runJoinValidate("AllInContainers", expectedExternalSubmissions, 
PROPS_REGULAR_CONTAINERS,
-        PROPS_REGULAR_CONTAINERS, PROPS_REGULAR_CONTAINERS);
+    runJoinValidate("AllInContainers", expectedExternalSubmissions, 
EXECUTION_CONTEXT_REGULAR_CONTAINERS,
+        EXECUTION_CONTEXT_REGULAR_CONTAINERS, 
EXECUTION_CONTEXT_REGULAR_CONTAINERS);
   }
 
   @Test(timeout = 60000)
   public void testAllInAM() throws Exception {
     int expectedExternalSubmissions = 0; // All in AM
-    runJoinValidate("AllInAM", expectedExternalSubmissions, PROPS_IN_AM,
-        PROPS_IN_AM, PROPS_IN_AM);
+    runJoinValidate("AllInAM", expectedExternalSubmissions, 
EXECUTION_CONTEXT_IN_AM,
+        EXECUTION_CONTEXT_IN_AM, EXECUTION_CONTEXT_IN_AM);
   }
 
   @Test(timeout = 60000)
   public void testMixed1() throws Exception { // M-ExtService, R-containers
     int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 0 for num 
reducers.
-    runJoinValidate("Mixed1", expectedExternalSubmissions, 
PROPS_EXT_SERVICE_PUSH,
-        PROPS_EXT_SERVICE_PUSH, PROPS_REGULAR_CONTAINERS);
+    runJoinValidate("Mixed1", expectedExternalSubmissions, 
EXECUTION_CONTEXT_EXT_SERVICE_PUSH,
+        EXECUTION_CONTEXT_EXT_SERVICE_PUSH, 
EXECUTION_CONTEXT_REGULAR_CONTAINERS);
   }
 
   @Test(timeout = 60000)
   public void testMixed2() throws Exception { // M-Containers, R-ExtService
     int expectedExternalSubmissions = 0 + 3; // 3 for num reducers.
-    runJoinValidate("Mixed2", expectedExternalSubmissions, 
PROPS_REGULAR_CONTAINERS,
-        PROPS_REGULAR_CONTAINERS, PROPS_EXT_SERVICE_PUSH);
+    runJoinValidate("Mixed2", expectedExternalSubmissions, 
EXECUTION_CONTEXT_REGULAR_CONTAINERS,
+        EXECUTION_CONTEXT_REGULAR_CONTAINERS, 
EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
   }
 
   @Test(timeout = 60000)
   public void testMixed3() throws Exception { // M - service, R-AM
     int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 0 for num 
reducers (in-AM).
-    runJoinValidate("Mixed3", expectedExternalSubmissions, 
PROPS_EXT_SERVICE_PUSH,
-        PROPS_EXT_SERVICE_PUSH, PROPS_IN_AM);
+    runJoinValidate("Mixed3", expectedExternalSubmissions, 
EXECUTION_CONTEXT_EXT_SERVICE_PUSH,
+        EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_IN_AM);
   }
 
   @Test(timeout = 60000)
   public void testMixed4() throws Exception { // M - containers, R-AM
     int expectedExternalSubmissions = 0 + 0; // Nothing in external service.
-    runJoinValidate("Mixed4", expectedExternalSubmissions, 
PROPS_REGULAR_CONTAINERS,
-        PROPS_REGULAR_CONTAINERS, PROPS_IN_AM);
+    runJoinValidate("Mixed4", expectedExternalSubmissions, 
EXECUTION_CONTEXT_REGULAR_CONTAINERS,
+        EXECUTION_CONTEXT_REGULAR_CONTAINERS, EXECUTION_CONTEXT_IN_AM);
   }
 
   @Test(timeout = 60000)
   public void testMixed5() throws Exception { // M1 - containers, 
M2-extservice, R-AM
     int expectedExternalSubmissions = 2 + 0; // 2 for M2
-    runJoinValidate("Mixed5", expectedExternalSubmissions, 
PROPS_REGULAR_CONTAINERS,
-        PROPS_EXT_SERVICE_PUSH, PROPS_IN_AM);
+    runJoinValidate("Mixed5", expectedExternalSubmissions, 
EXECUTION_CONTEXT_REGULAR_CONTAINERS,
+        EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_IN_AM);
   }
 
   @Test(timeout = 60000)
   public void testMixed6() throws Exception { // M - AM, R - Service
     int expectedExternalSubmissions = 0 + 3; // 3 for R in service
-    runJoinValidate("Mixed6", expectedExternalSubmissions, PROPS_IN_AM,
-        PROPS_IN_AM, PROPS_EXT_SERVICE_PUSH);
+    runJoinValidate("Mixed6", expectedExternalSubmissions, 
EXECUTION_CONTEXT_IN_AM,
+        EXECUTION_CONTEXT_IN_AM, EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
   }
 
   @Test(timeout = 60000)
   public void testMixed7() throws Exception { // M - AM, R - Containers
     int expectedExternalSubmissions = 0; // Nothing in ext service
-    runJoinValidate("Mixed7", expectedExternalSubmissions, PROPS_IN_AM,
-        PROPS_IN_AM, PROPS_REGULAR_CONTAINERS);
+    runJoinValidate("Mixed7", expectedExternalSubmissions, 
EXECUTION_CONTEXT_IN_AM,
+        EXECUTION_CONTEXT_IN_AM, EXECUTION_CONTEXT_REGULAR_CONTAINERS);
   }
 
   @Test(timeout = 60000)
@@ -303,10 +289,9 @@ public class TestExternalTezServices {
     DAG dag = DAG.create(ContainerRunnerImpl.DAG_NAME_INSTRUMENTED_FAILURES);
     Vertex v =Vertex.create("Vertex1", 
ProcessorDescriptor.create(SleepProcessor.class.getName()),
         3);
-    for (Map.Entry<String, String> prop : PROPS_EXT_SERVICE_PUSH.entrySet()) {
-      v.setConf(prop.getKey(), prop.getValue());
-    }
+    v.setExecutionContext(EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
     dag.addVertex(v);
+
     DAGClient dagClient = sharedTezClient.submitDAG(dag);
     DAGStatus dagStatus = dagClient.waitForCompletion();
     assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
@@ -315,16 +300,16 @@ public class TestExternalTezServices {
 
   }
 
-  private void runJoinValidate(String name, int extExpectedCount, Map<String, 
String> lhsProps,
-                               Map<String, String> rhsProps,
-                               Map<String, String> validateProps) throws
+  private void runJoinValidate(String name, int extExpectedCount, 
VertexExecutionContext lhsContext,
+                               VertexExecutionContext rhsContext,
+                               VertexExecutionContext validateContext) throws
       Exception {
     int externalSubmissionCount = tezTestServiceCluster.getNumSubmissions();
 
     TezConfiguration tezConf = new TezConfiguration(confForJobs);
     JoinValidateConfigured joinValidate =
-        new JoinValidateConfigured(lhsProps, rhsProps,
-            validateProps, name);
+        new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, lhsContext, 
rhsContext,
+            validateContext, name);
     String[] validateArgs = new String[]{"-disableSplitGrouping",
         HASH_JOIN_EXPECTED_RESULT_PATH.toString(), 
HASH_JOIN_OUTPUT_PATH.toString(), "3"};
     assertEquals(0, joinValidate.run(tezConf, validateArgs, sharedTezClient));

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index fff39a0..353fe23 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -62,6 +62,7 @@ import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.RelocalizationUtils;
 import org.apache.tez.runtime.api.ExecutionContext;
@@ -477,7 +478,9 @@ public class TezChild {
     }
 
     // Security framework already loaded the tokens into current ugi
-    
TezUtilsInternal.addUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name()),
 defaultConf);
+    DAGProtos.ConfigurationProto confProto =
+        
TezUtilsInternal.readUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name()));
+    TezUtilsInternal.addUserSpecifiedTezConfiguration(defaultConf, 
confProto.getConfKeyValuesList());
     UserGroupInformation.setConfiguration(defaultConf);
     Credentials credentials = 
UserGroupInformation.getCurrentUser().getCredentials();
     TezChild tezChild = newTezChild(defaultConf, host, port, 
containerIdentifier,

Reply via email to