http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java index 7c6a6a4..594e6d3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java @@ -17,18 +17,21 @@ package org.apache.tez.dag.app.launcher; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.net.UnknownHostException; +import java.util.List; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; import org.apache.tez.serviceplugins.api.ContainerStopRequest; -import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerLauncherContextImpl; @@ -63,35 +66,35 @@ public class ContainerLauncherRouter extends AbstractService public ContainerLauncherRouter(Configuration conf, AppContext context, TaskAttemptListener taskAttemptListener, String workingDirectory, - String[] containerLauncherClassIdentifiers, + List<NamedEntityDescriptor> containerLauncherDescriptors, boolean isPureLocalMode) throws UnknownHostException { super(ContainerLauncherRouter.class.getName()); this.appContext = context; - if (containerLauncherClassIdentifiers == null || containerLauncherClassIdentifiers.length == 0) { + if (containerLauncherDescriptors == null || containerLauncherDescriptors.isEmpty()) { if (isPureLocalMode) { - containerLauncherClassIdentifiers = - new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT}; + containerLauncherDescriptors = Lists.newArrayList(new NamedEntityDescriptor( + TezConstants.getTezUberServicePluginName(), null)); } else { - containerLauncherClassIdentifiers = - new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT}; + containerLauncherDescriptors = Lists.newArrayList(new NamedEntityDescriptor( + TezConstants.getTezYarnServicePluginName(), null)); } } - containerLauncherContexts = new ContainerLauncherContext[containerLauncherClassIdentifiers.length]; - containerLaunchers = new ContainerLauncher[containerLauncherClassIdentifiers.length]; - containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[containerLauncherClassIdentifiers.length]; + containerLauncherContexts = new ContainerLauncherContext[containerLauncherDescriptors.size()]; + containerLaunchers = new ContainerLauncher[containerLauncherDescriptors.size()]; + containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[containerLauncherDescriptors.size()]; - for (int i = 0; i < containerLauncherClassIdentifiers.length; i++) { + for (int i = 0; i < containerLauncherDescriptors.size(); i++) { ContainerLauncherContext containerLauncherContext = new ContainerLauncherContextImpl(context, taskAttemptListener); containerLauncherContexts[i] = containerLauncherContext; - containerLaunchers[i] = createContainerLauncher(containerLauncherClassIdentifiers[i], context, + containerLaunchers[i] = createContainerLauncher(containerLauncherDescriptors.get(i), context, containerLauncherContext, taskAttemptListener, workingDirectory, isPureLocalMode, conf); containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService(containerLaunchers[i]); } } - private ContainerLauncher createContainerLauncher(String containerLauncherClassIdentifier, + private ContainerLauncher createContainerLauncher(NamedEntityDescriptor containerLauncherDescriptor, AppContext context, ContainerLauncherContext containerLauncherContext, TaskAttemptListener taskAttemptListener, @@ -99,11 +102,12 @@ public class ContainerLauncherRouter extends AbstractService boolean isPureLocalMode, Configuration conf) throws UnknownHostException { - if (containerLauncherClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { + if (containerLauncherDescriptor.getEntityName().equals( + TezConstants.getTezYarnServicePluginName())) { LOG.info("Creating DefaultContainerLauncher"); return new ContainerLauncherImpl(containerLauncherContext); - } else if (containerLauncherClassIdentifier - .equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) { + } else if (containerLauncherDescriptor.getEntityName() + .equals(TezConstants.getTezUberServicePluginName())) { LOG.info("Creating LocalContainerLauncher"); // TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of // extensive internals which are only available at runtime. Will likely require @@ -111,10 +115,10 @@ public class ContainerLauncherRouter extends AbstractService return new LocalContainerLauncher(containerLauncherContext, context, taskAttemptListener, workingDirectory, isPureLocalMode); } else { - LOG.info("Creating container launcher : " + containerLauncherClassIdentifier); + LOG.info("Creating container launcher {}:{} ", containerLauncherDescriptor.getEntityName(), containerLauncherDescriptor.getClassName()); Class<? extends ContainerLauncher> containerLauncherClazz = (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz( - containerLauncherClassIdentifier); + containerLauncherDescriptor.getClassName()); try { Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz .getConstructor(ContainerLauncherContext.class);
http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index b66e5fa..8038e2c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -22,6 +22,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -34,6 +35,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; @@ -55,7 +58,6 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.TaskLocationHint.TaskBasedLocationAffinity; @@ -143,14 +145,14 @@ public class TaskSchedulerEventHandler extends AbstractService implements * @param eventHandler * @param containerSignatureMatcher * @param webUI - * @param schedulerClasses the list of scheduler classes / codes. Tez internal classes are represented as codes. + * @param schedulerDescriptors the list of scheduler descriptors. Tez internal classes will not have the class names populated. * An empty list defaults to using the YarnTaskScheduler as the only source. */ @SuppressWarnings("rawtypes") public TaskSchedulerEventHandler(AppContext appContext, DAGClientServer clientService, EventHandler eventHandler, ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI, - String [] schedulerClasses, boolean isPureLocalMode) { + List<NamedEntityDescriptor> schedulerDescriptors, boolean isPureLocalMode) { super(TaskSchedulerEventHandler.class.getName()); this.appContext = appContext; this.eventHandler = eventHandler; @@ -166,31 +168,34 @@ public class TaskSchedulerEventHandler extends AbstractService implements // Override everything for pure local mode if (isPureLocalMode) { - this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT}; + this.taskSchedulerClasses = new String[] {TezConstants.getTezUberServicePluginName()}; this.yarnTaskSchedulerIndex = -1; } else { - if (schedulerClasses == null || schedulerClasses.length ==0) { - this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT}; + if (schedulerDescriptors == null || schedulerDescriptors.isEmpty()) { + this.taskSchedulerClasses = new String[] {TezConstants.getTezYarnServicePluginName()}; this.yarnTaskSchedulerIndex = 0; } else { // Ensure the YarnScheduler will be setup and note it's index. This will be responsible for heartbeats and YARN registration. int foundYarnTaskSchedulerIndex = -1; - for (int i = 0 ; i < schedulerClasses.length ; i++) { - if (schedulerClasses[i].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { + + List<String> taskSchedulerClassList = new LinkedList<>(); + for (int i = 0 ; i < schedulerDescriptors.size() ; i++) { + if (schedulerDescriptors.get(i).getEntityName().equals( + TezConstants.getTezYarnServicePluginName())) { + taskSchedulerClassList.add(schedulerDescriptors.get(i).getEntityName()); foundYarnTaskSchedulerIndex = i; - break; + } else if (schedulerDescriptors.get(i).getEntityName().equals( + TezConstants.getTezUberServicePluginName())) { + taskSchedulerClassList.add(schedulerDescriptors.get(i).getEntityName()); + } else { + taskSchedulerClassList.add(schedulerDescriptors.get(i).getClassName()); } } - if (foundYarnTaskSchedulerIndex == -1) { // Not found. Add at the end. - this.taskSchedulerClasses = new String[schedulerClasses.length+1]; - foundYarnTaskSchedulerIndex = this.taskSchedulerClasses.length -1; - for (int i = 0 ; i < schedulerClasses.length ; i++) { // Copy over the rest. - this.taskSchedulerClasses[i] = schedulerClasses[i]; - } - this.taskSchedulerClasses[foundYarnTaskSchedulerIndex] = TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT; - } else { - this.taskSchedulerClasses = schedulerClasses; + if (foundYarnTaskSchedulerIndex == -1) { + taskSchedulerClassList.add(YarnTaskSchedulerService.class.getName()); + foundYarnTaskSchedulerIndex = taskSchedulerClassList.size() -1; } + this.taskSchedulerClasses = taskSchedulerClassList.toArray(new String[taskSchedulerClassList.size()]); this.yarnTaskSchedulerIndex = foundYarnTaskSchedulerIndex; } } @@ -419,10 +424,10 @@ public class TaskSchedulerEventHandler extends AbstractService implements new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl, customAppIdIdentifier, host, port, getConfig()); TaskSchedulerContext wrappedContext = new TaskSchedulerContextImplWrapper(rawContext, appCallbackExecutor); - if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { + if (schedulerClassName.equals(TezConstants.getTezYarnServicePluginName())) { LOG.info("Creating TaskScheduler: YarnTaskSchedulerService"); return new YarnTaskSchedulerService(wrappedContext); - } else if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) { + } else if (schedulerClassName.equals(TezConstants.getTezUberServicePluginName())) { LOG.info("Creating TaskScheduler: Local TaskScheduler"); return new LocalTaskSchedulerService(wrappedContext); } else { @@ -454,7 +459,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements for (int i = 0; i < taskSchedulerClasses.length; i++) { long customAppIdIdentifier; if (isPureLocalMode || taskSchedulerClasses[i].equals( - TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { // Use the app identifier from the appId. + TezConstants.getTezYarnServicePluginName())) { // Use the app identifier from the appId. customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp(); } else { customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT); http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index 21ae5f7..17feeaa 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; @@ -486,7 +487,7 @@ public class MockDAGAppMaster extends DAGAppMaster { Credentials credentials, String jobUserName, int handlerConcurrency, int numConcurrentContainers) { super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime, isSession, workingDirectory, localDirs, logDirs, new TezApiVersionInfo().getVersion(), 1, - credentials, jobUserName); + credentials, jobUserName, null); containerLauncherContext = new ContainerLauncherContextImpl(getContext(), getTaskAttemptListener()); containerLauncher = new MockContainerLauncher(launcherGoFlag, containerLauncherContext); shutdownHandler = new MockDAGAppMasterShutdownHandler(); @@ -500,7 +501,7 @@ public class MockDAGAppMaster extends DAGAppMaster { // use mock container launcher for tests @Override protected ContainerLauncherRouter createContainerLauncherRouter(final Configuration conf, - String[] containerLaunchers, + List<NamedEntityDescriptor> containerLauncherDescirptors, boolean isLocal) throws UnknownHostException { return new ContainerLauncherRouter(containerLauncher, getContext()); http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java index 41a7373..e45b0a2 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java @@ -50,6 +50,7 @@ import org.apache.tez.common.ContainerTask; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.common.security.TokenCache; +import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; @@ -375,10 +376,10 @@ public class TestTaskAttemptListenerImplTezDag { public TaskAttemptListenerImplForTest(AppContext context, TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, - String[] taskCommunicatorClassIdentifiers, + List<NamedEntityDescriptor> taskCommDescriptors, Configuration conf, boolean isPureLocalMode) { - super(context, thh, chh, taskCommunicatorClassIdentifiers, conf, + super(context, thh, chh, taskCommDescriptors, conf, isPureLocalMode); } http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java index 3ea0446..f191175 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ContainerSignatureMatcher; +import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.client.DAGClientServer; @@ -91,7 +93,7 @@ public class TestTaskSchedulerEventHandler { public MockTaskSchedulerEventHandler(AppContext appContext, DAGClientServer clientService, EventHandler eventHandler, ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) { - super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new String[] {}, false); + super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new LinkedList<NamedEntityDescriptor>(), false); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index 966c95a..60d37e9 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ContainerSignatureMatcher; +import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest; @@ -130,7 +131,7 @@ class TestTaskSchedulerHelpers { EventHandler eventHandler, TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync, ContainerSignatureMatcher containerSignatureMatcher) { - super(appContext, null, eventHandler, containerSignatureMatcher, null, new String[]{}, false); + super(appContext, null, eventHandler, containerSignatureMatcher, null, new LinkedList<NamedEntityDescriptor>(), false); this.amrmClientAsync = amrmClientAsync; this.containerSignatureMatcher = containerSignatureMatcher; } http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java index ba17ba0..611e8cc 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java @@ -20,8 +20,8 @@ package org.apache.tez.examples; import java.io.IOException; import java.util.Set; -import java.util.Map; +import org.apache.tez.dag.api.Vertex.VertexExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -137,6 +137,9 @@ public class JoinValidate extends TezExampleBase { private DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions) throws IOException { DAG dag = DAG.create(getDagName()); + if (getDefaultExecutionContext() != null) { + dag.setExecutionContext(getDefaultExecutionContext()); + } // Configuration for intermediate output - shared by Vertex1 and Vertex2 // This should only be setting selective keys from the underlying conf. Fix after there's a @@ -153,18 +156,18 @@ public class JoinValidate extends TezExampleBase { MRInput .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, lhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build()); - setVertexProperties(lhsVertex, getLhsVertexProperties()); + setVertexExecutionContext(lhsVertex, getLhsExecutionContext()); Vertex rhsVertex = Vertex.create(RHS_INPUT_NAME, ProcessorDescriptor.create( ForwardingProcessor.class.getName())).addDataSource("rhs", MRInput .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, rhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build()); - setVertexProperties(rhsVertex, getRhsVertexProperties()); + setVertexExecutionContext(rhsVertex, getRhsExecutionContext()); Vertex joinValidateVertex = Vertex.create("joinvalidate", ProcessorDescriptor.create( JoinValidateProcessor.class.getName()), numPartitions); - setVertexProperties(joinValidateVertex, getValidateVertexProperties()); + setVertexExecutionContext(joinValidateVertex, getValidateExecutionContext()); Edge e1 = Edge.create(lhsVertex, joinValidateVertex, edgeConf.createDefaultEdgeProperty()); Edge e2 = Edge.create(rhsVertex, joinValidateVertex, edgeConf.createDefaultEdgeProperty()); @@ -174,23 +177,25 @@ public class JoinValidate extends TezExampleBase { return dag; } - private void setVertexProperties(Vertex vertex, Map<String, String> properties) { - if (properties != null) { - for (Map.Entry<String, String> entry : properties.entrySet()) { - vertex.setConf(entry.getKey(), entry.getValue()); - } + private void setVertexExecutionContext(Vertex vertex, VertexExecutionContext executionContext) { + if (executionContext != null) { + vertex.setExecutionContext(executionContext); } } - protected Map<String, String> getLhsVertexProperties() { + protected VertexExecutionContext getDefaultExecutionContext() { return null; } - protected Map<String, String> getRhsVertexProperties() { + protected VertexExecutionContext getLhsExecutionContext() { return null; } - protected Map<String, String> getValidateVertexProperties() { + protected VertexExecutionContext getRhsExecutionContext() { + return null; + } + + protected VertexExecutionContext getValidateExecutionContext() { return null; } @@ -240,4 +245,6 @@ public class JoinValidate extends TezExampleBase { } } } + + } http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java index 85f9415..0002b42 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java @@ -121,7 +121,8 @@ public class TezTestServiceContainerLauncher extends ContainerLauncher { private RunContainerRequestProto constructRunContainerRequest(ContainerLaunchRequest launchRequest) throws IOException { RunContainerRequestProto.Builder builder = RunContainerRequestProto.newBuilder(); - Preconditions.checkArgument(launchRequest.getTaskCommunicatorName().equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)); + Preconditions.checkArgument(launchRequest.getTaskCommunicatorName().equals( + TezConstants.getTezYarnServicePluginName())); InetSocketAddress address = (InetSocketAddress) getContext().getTaskCommunicatorMetaInfo(launchRequest.getTaskCommunicatorName()); builder.setAmHost(address.getHostName()).setAmPort(address.getPort()); builder.setAppAttemptNumber(appAttemptId.getAttemptId()); http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java index e5d2e3b..f31476f 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java @@ -14,36 +14,46 @@ package org.apache.tez.examples; -import java.util.Map; + +import org.apache.tez.dag.api.Vertex.VertexExecutionContext; public class JoinValidateConfigured extends JoinValidate { - private final Map<String, String> lhsProps; - private final Map<String, String> rhsProps; - private final Map<String, String> validateProps; + private final VertexExecutionContext defaultExecutionContext; + private final VertexExecutionContext lhsContext; + private final VertexExecutionContext rhsContext; + private final VertexExecutionContext validateContext; private final String dagNameSuffix; - public JoinValidateConfigured(Map<String, String> lhsProps, Map<String, String> rhsProps, - Map<String, String> validateProps, String dagNameSuffix) { - this.lhsProps = lhsProps; - this.rhsProps = rhsProps; - this.validateProps = validateProps; + public JoinValidateConfigured(VertexExecutionContext defaultExecutionContext, + VertexExecutionContext lhsContext, + VertexExecutionContext rhsContext, + VertexExecutionContext validateContext, String dagNameSuffix) { + this.defaultExecutionContext = defaultExecutionContext; + this.lhsContext = lhsContext; + this.rhsContext = rhsContext; + this.validateContext = validateContext; this.dagNameSuffix = dagNameSuffix; } @Override - protected Map<String, String> getLhsVertexProperties() { - return this.lhsProps; + protected VertexExecutionContext getDefaultExecutionContext() { + return this.defaultExecutionContext; + } + + @Override + protected VertexExecutionContext getLhsExecutionContext() { + return this.lhsContext; } @Override - protected Map<String, String> getRhsVertexProperties() { - return this.rhsProps; + protected VertexExecutionContext getRhsExecutionContext() { + return this.rhsContext; } @Override - protected Map<String, String> getValidateVertexProperties() { - return this.validateProps; + protected VertexExecutionContext getValidateExecutionContext() { + return this.validateContext; } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java index 45c70f1..07dd363 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java @@ -19,7 +19,6 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; import java.util.Map; -import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -28,9 +27,9 @@ import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.Vertex.VertexExecutionContext; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher; @@ -43,6 +42,10 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.processor.SleepProcessor; import org.apache.tez.service.MiniTezTestServiceCluster; import org.apache.tez.service.impl.ContainerRunnerImpl; +import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; +import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor; +import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; import org.apache.tez.test.MiniTezCluster; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -72,9 +75,15 @@ public class TestExternalTezServices { private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = new Path(SRC_DATA_DIR, "expectedOutputPath"); private static final Path HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, "outPath"); - private static final Map<String, String> PROPS_EXT_SERVICE_PUSH = Maps.newHashMap(); - private static final Map<String, String> PROPS_REGULAR_CONTAINERS = Maps.newHashMap(); - private static final Map<String, String> PROPS_IN_AM = Maps.newHashMap(); + private static final VertexExecutionContext EXECUTION_CONTEXT_EXT_SERVICE_PUSH = + VertexExecutionContext.create( + EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME); + private static final VertexExecutionContext EXECUTION_CONTEXT_REGULAR_CONTAINERS = + VertexExecutionContext.createExecuteInContainers(true); + private static final VertexExecutionContext EXECUTION_CONTEXT_IN_AM = + VertexExecutionContext.createExecuteInAm(true); + + private static final VertexExecutionContext EXECUTION_CONTEXT_DEFAULT = EXECUTION_CONTEXT_EXT_SERVICE_PUSH; private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestExternalTezServices.class.getName() + "-tmpDir"; @@ -127,51 +136,28 @@ public class TestExternalTezServices { confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); confForJobs.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false); - confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS, - TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT, - TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT, - EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskSchedulerService.class.getName()); - - confForJobs.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS, - TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT, - TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT, - EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceNoOpContainerLauncher.class.getName()); - - confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS, - TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT, - TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT, - EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskCommunicatorImpl.class.getName()); - - // Default all jobs to run via the service. Individual tests override this on a per vertex/dag level. - confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME); - confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME); - confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME); - - // Setup various executor sets - PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, - TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); - PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, - TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); - PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, - TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); - - PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME); - PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME); - PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME); - - PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, - TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT); - PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, - TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT); - PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, - TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT); + TaskSchedulerDescriptor[] taskSchedulerDescriptors = new TaskSchedulerDescriptor[]{ + TaskSchedulerDescriptor + .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName())}; + + ContainerLauncherDescriptor[] containerLauncherDescriptors = new ContainerLauncherDescriptor[]{ + ContainerLauncherDescriptor + .create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName())}; + + TaskCommunicatorDescriptor[] taskCommunicatorDescriptors = new TaskCommunicatorDescriptor[]{ + TaskCommunicatorDescriptor + .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName())}; + ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true, true, + taskSchedulerDescriptors, containerLauncherDescriptors, taskCommunicatorDescriptors); // Create a session to use for all tests. TezConfiguration tezClientConf = new TezConfiguration(confForJobs); - sharedTezClient = TezClient.create(TestExternalTezServices.class.getSimpleName() + "_session", - tezClientConf, true); + sharedTezClient = TezClient + .newBuilder(TestExternalTezServices.class.getSimpleName() + "_session", tezClientConf) + .setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build(); + sharedTezClient.start(); LOG.info("Shared TezSession started"); sharedTezClient.waitTillReady(); @@ -225,71 +211,71 @@ public class TestExternalTezServices { @Test(timeout = 60000) public void testAllInService() throws Exception { int expectedExternalSubmissions = 4 + 3; //4 for 4 src files, 3 for num reducers. - runJoinValidate("AllInService", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH, - PROPS_EXT_SERVICE_PUSH, PROPS_EXT_SERVICE_PUSH); + runJoinValidate("AllInService", expectedExternalSubmissions, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, + EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_EXT_SERVICE_PUSH); } @Test(timeout = 60000) public void testAllInContainers() throws Exception { int expectedExternalSubmissions = 0; // All in containers - runJoinValidate("AllInContainers", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS, - PROPS_REGULAR_CONTAINERS, PROPS_REGULAR_CONTAINERS); + runJoinValidate("AllInContainers", expectedExternalSubmissions, EXECUTION_CONTEXT_REGULAR_CONTAINERS, + EXECUTION_CONTEXT_REGULAR_CONTAINERS, EXECUTION_CONTEXT_REGULAR_CONTAINERS); } @Test(timeout = 60000) public void testAllInAM() throws Exception { int expectedExternalSubmissions = 0; // All in AM - runJoinValidate("AllInAM", expectedExternalSubmissions, PROPS_IN_AM, - PROPS_IN_AM, PROPS_IN_AM); + runJoinValidate("AllInAM", expectedExternalSubmissions, EXECUTION_CONTEXT_IN_AM, + EXECUTION_CONTEXT_IN_AM, EXECUTION_CONTEXT_IN_AM); } @Test(timeout = 60000) public void testMixed1() throws Exception { // M-ExtService, R-containers int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 0 for num reducers. - runJoinValidate("Mixed1", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH, - PROPS_EXT_SERVICE_PUSH, PROPS_REGULAR_CONTAINERS); + runJoinValidate("Mixed1", expectedExternalSubmissions, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, + EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_REGULAR_CONTAINERS); } @Test(timeout = 60000) public void testMixed2() throws Exception { // M-Containers, R-ExtService int expectedExternalSubmissions = 0 + 3; // 3 for num reducers. - runJoinValidate("Mixed2", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS, - PROPS_REGULAR_CONTAINERS, PROPS_EXT_SERVICE_PUSH); + runJoinValidate("Mixed2", expectedExternalSubmissions, EXECUTION_CONTEXT_REGULAR_CONTAINERS, + EXECUTION_CONTEXT_REGULAR_CONTAINERS, EXECUTION_CONTEXT_EXT_SERVICE_PUSH); } @Test(timeout = 60000) public void testMixed3() throws Exception { // M - service, R-AM int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 0 for num reducers (in-AM). - runJoinValidate("Mixed3", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH, - PROPS_EXT_SERVICE_PUSH, PROPS_IN_AM); + runJoinValidate("Mixed3", expectedExternalSubmissions, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, + EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_IN_AM); } @Test(timeout = 60000) public void testMixed4() throws Exception { // M - containers, R-AM int expectedExternalSubmissions = 0 + 0; // Nothing in external service. - runJoinValidate("Mixed4", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS, - PROPS_REGULAR_CONTAINERS, PROPS_IN_AM); + runJoinValidate("Mixed4", expectedExternalSubmissions, EXECUTION_CONTEXT_REGULAR_CONTAINERS, + EXECUTION_CONTEXT_REGULAR_CONTAINERS, EXECUTION_CONTEXT_IN_AM); } @Test(timeout = 60000) public void testMixed5() throws Exception { // M1 - containers, M2-extservice, R-AM int expectedExternalSubmissions = 2 + 0; // 2 for M2 - runJoinValidate("Mixed5", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS, - PROPS_EXT_SERVICE_PUSH, PROPS_IN_AM); + runJoinValidate("Mixed5", expectedExternalSubmissions, EXECUTION_CONTEXT_REGULAR_CONTAINERS, + EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_IN_AM); } @Test(timeout = 60000) public void testMixed6() throws Exception { // M - AM, R - Service int expectedExternalSubmissions = 0 + 3; // 3 for R in service - runJoinValidate("Mixed6", expectedExternalSubmissions, PROPS_IN_AM, - PROPS_IN_AM, PROPS_EXT_SERVICE_PUSH); + runJoinValidate("Mixed6", expectedExternalSubmissions, EXECUTION_CONTEXT_IN_AM, + EXECUTION_CONTEXT_IN_AM, EXECUTION_CONTEXT_EXT_SERVICE_PUSH); } @Test(timeout = 60000) public void testMixed7() throws Exception { // M - AM, R - Containers int expectedExternalSubmissions = 0; // Nothing in ext service - runJoinValidate("Mixed7", expectedExternalSubmissions, PROPS_IN_AM, - PROPS_IN_AM, PROPS_REGULAR_CONTAINERS); + runJoinValidate("Mixed7", expectedExternalSubmissions, EXECUTION_CONTEXT_IN_AM, + EXECUTION_CONTEXT_IN_AM, EXECUTION_CONTEXT_REGULAR_CONTAINERS); } @Test(timeout = 60000) @@ -303,10 +289,9 @@ public class TestExternalTezServices { DAG dag = DAG.create(ContainerRunnerImpl.DAG_NAME_INSTRUMENTED_FAILURES); Vertex v =Vertex.create("Vertex1", ProcessorDescriptor.create(SleepProcessor.class.getName()), 3); - for (Map.Entry<String, String> prop : PROPS_EXT_SERVICE_PUSH.entrySet()) { - v.setConf(prop.getKey(), prop.getValue()); - } + v.setExecutionContext(EXECUTION_CONTEXT_EXT_SERVICE_PUSH); dag.addVertex(v); + DAGClient dagClient = sharedTezClient.submitDAG(dag); DAGStatus dagStatus = dagClient.waitForCompletion(); assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); @@ -315,16 +300,16 @@ public class TestExternalTezServices { } - private void runJoinValidate(String name, int extExpectedCount, Map<String, String> lhsProps, - Map<String, String> rhsProps, - Map<String, String> validateProps) throws + private void runJoinValidate(String name, int extExpectedCount, VertexExecutionContext lhsContext, + VertexExecutionContext rhsContext, + VertexExecutionContext validateContext) throws Exception { int externalSubmissionCount = tezTestServiceCluster.getNumSubmissions(); TezConfiguration tezConf = new TezConfiguration(confForJobs); JoinValidateConfigured joinValidate = - new JoinValidateConfigured(lhsProps, rhsProps, - validateProps, name); + new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, lhsContext, rhsContext, + validateContext, name); String[] validateArgs = new String[]{"-disableSplitGrouping", HASH_JOIN_EXPECTED_RESULT_PATH.toString(), HASH_JOIN_OUTPUT_PATH.toString(), "3"}; assertEquals(0, joinValidate.run(tezConf, validateArgs, sharedTezClient)); http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index fff39a0..353fe23 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -62,6 +62,7 @@ import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.utils.RelocalizationUtils; import org.apache.tez.runtime.api.ExecutionContext; @@ -477,7 +478,9 @@ public class TezChild { } // Security framework already loaded the tokens into current ugi - TezUtilsInternal.addUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name()), defaultConf); + DAGProtos.ConfigurationProto confProto = + TezUtilsInternal.readUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name())); + TezUtilsInternal.addUserSpecifiedTezConfiguration(defaultConf, confProto.getConfKeyValuesList()); UserGroupInformation.setConfiguration(defaultConf); Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier,
