TEZ-2653. Change service contexts to expose a user specified payload instead of the AM configuration. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4a18c5d5 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4a18c5d5 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4a18c5d5 Branch: refs/heads/master Commit: 4a18c5d5baaaab4565da9d3c3085d98dfc91d07e Parents: ec5acd8 Author: Siddharth Seth <[email protected]> Authored: Tue Jul 28 14:56:56 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri Aug 21 18:14:40 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../tez/dag/api/NamedEntityDescriptor.java | 7 ++- .../api/ContainerLauncherContext.java | 5 +- .../api/TaskSchedulerContext.java | 5 +- .../tez/dag/api/TaskCommunicatorContext.java | 4 +- .../dag/app/ContainerLauncherContextImpl.java | 10 ++-- .../org/apache/tez/dag/app/DAGAppMaster.java | 20 +++++-- .../dag/app/TaskAttemptListenerImpTezDag.java | 19 ++++-- .../dag/app/TaskCommunicatorContextImpl.java | 12 ++-- .../tez/dag/app/TezTaskCommunicatorImpl.java | 12 +++- .../dag/app/launcher/ContainerLauncherImpl.java | 8 ++- .../app/launcher/ContainerLauncherRouter.java | 24 +++++--- .../app/launcher/LocalContainerLauncher.java | 10 +++- .../dag/app/rm/LocalTaskSchedulerService.java | 10 +++- .../dag/app/rm/TaskSchedulerContextImpl.java | 12 ++-- .../app/rm/TaskSchedulerContextImplWrapper.java | 6 +- .../dag/app/rm/TaskSchedulerEventHandler.java | 61 ++++++++++++-------- .../dag/app/rm/YarnTaskSchedulerService.java | 21 +++++-- .../apache/tez/dag/app/MockDAGAppMaster.java | 15 ++++- .../app/TestTaskAttemptListenerImplTezDag.java | 32 ++++++++-- .../app/TestTaskAttemptListenerImplTezDag2.java | 12 +++- .../tez/dag/app/rm/TestContainerReuse.java | 19 +++--- .../tez/dag/app/rm/TestTaskScheduler.java | 3 +- .../app/rm/TestTaskSchedulerEventHandler.java | 18 +++++- .../dag/app/rm/TestTaskSchedulerHelpers.java | 26 +++++++-- .../TezTestServiceContainerLauncher.java | 15 +++-- .../rm/TezTestServiceTaskSchedulerService.java | 10 +++- .../TezTestServiceTaskCommunicatorImpl.java | 2 +- .../tez/tests/TestExternalTezServices.java | 13 ++++- 29 files changed, 294 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index a201942..b88044b 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -38,5 +38,6 @@ ALL CHANGES: TEZ-2005. Define basic interface for pluggable TaskScheduler. TEZ-2651. Pluggable services should not extend AbstractService. TEZ-2652. Cleanup the way services are specified for an AM and vertices. + TEZ-2653. Change service contexts to expose a user specified payload instead of the AM configuration. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java index bad0d10..723d43f 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java @@ -17,7 +17,7 @@ package org.apache.tez.dag.api; import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; -public class NamedEntityDescriptor<T extends EntityDescriptor<T>> extends EntityDescriptor<T> { +public class NamedEntityDescriptor<T extends NamedEntityDescriptor<T>> extends EntityDescriptor<NamedEntityDescriptor<T>> { private final String entityName; @InterfaceAudience.Private @@ -30,4 +30,9 @@ public class NamedEntityDescriptor<T extends EntityDescriptor<T>> extends Entity public String getEntityName() { return entityName; } + + public T setUserPayload(UserPayload userPayload) { + super.setUserPayload(userPayload); + return (T) this; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java index 836dc4a..5da38b8 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java @@ -16,9 +16,9 @@ package org.apache.tez.serviceplugins.api; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.dag.api.UserPayload; @InterfaceAudience.Public @InterfaceStability.Unstable @@ -43,8 +43,7 @@ public interface ContainerLauncherContext { // Lookup APIs - // TODO TEZ-2003. To be replaced by getInitialPayload once the DAG API is changed. - Configuration getInitialConfiguration(); + UserPayload getInitialUserPayload(); int getNumNodes(String sourceName); http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java index b2c8799..6f37641 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java @@ -20,7 +20,6 @@ import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; @@ -30,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.common.ContainerSignatureMatcher; +import org.apache.tez.dag.api.UserPayload; @InterfaceAudience.Public @InterfaceStability.Unstable @@ -84,8 +84,7 @@ public interface TaskSchedulerContext { // Getters - // TODO TEZ-2003. To be replaced by getInitialPayload - public Configuration getInitialConfiguration(); + public UserPayload getInitialUserPayload(); public String getAppTrackingUrl(); http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java index ab32ec1..a1e94a3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java @@ -18,7 +18,6 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.Set; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -36,8 +35,7 @@ public interface TaskCommunicatorContext { // TODO TEZ-2003 Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc. - // TODO TEZ-2003 To be replaced by getInitialPayload - Configuration getInitialConfiguration(); + UserPayload getInitialUserPayload(); ApplicationAttemptId getApplicationAttemptId(); Credentials getCredentials(); http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java index 997775a..92bbbdc 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java @@ -14,10 +14,10 @@ package org.apache.tez.dag.app; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.app.rm.container.AMContainerEvent; @@ -33,10 +33,12 @@ public class ContainerLauncherContextImpl implements ContainerLauncherContext { private final AppContext context; private final TaskAttemptListener tal; + private final UserPayload initialUserPayload; - public ContainerLauncherContextImpl(AppContext appContext, TaskAttemptListener tal) { + public ContainerLauncherContextImpl(AppContext appContext, TaskAttemptListener tal, UserPayload initialUserPayload) { this.context = appContext; this.tal = tal; + this.initialUserPayload = initialUserPayload; } @Override @@ -76,8 +78,8 @@ public class ContainerLauncherContextImpl implements ContainerLauncherContext { } @Override - public Configuration getInitialConfiguration() { - return context.getAMConf(); + public UserPayload getInitialUserPayload() { + return initialUserPayload; } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 4f75891..52621bd 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -62,8 +62,10 @@ import com.google.common.collect.HashBiMap; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; +import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.SessionNotRunning; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; @@ -492,9 +494,12 @@ public class DAGAppMaster extends AbstractService { jobTokenSecretManager.addTokenForJob( appAttemptID.getApplicationId().toString(), sessionToken); + UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(amConf); + //service to handle requests to TaskUmbilicalProtocol taskAttemptListener = createTaskAttemptListener(context, - taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorDescriptors, isLocal); + taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorDescriptors, + defaultPayload, isLocal); addIfService(taskAttemptListener, true); containerSignatureMatcher = createContainerSignatureMatcher(); @@ -540,9 +545,11 @@ public class DAGAppMaster extends AbstractService { } } + + this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context, clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService, - taskSchedulerDescriptors, isLocal); + taskSchedulerDescriptors, defaultPayload, isLocal); addIfService(taskSchedulerEventHandler, true); if (enableWebUIService()) { @@ -560,7 +567,7 @@ public class DAGAppMaster extends AbstractService { taskSchedulerEventHandler); addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer); - this.containerLauncherRouter = createContainerLauncherRouter(conf, containerLauncherDescriptors, isLocal); + this.containerLauncherRouter = createContainerLauncherRouter(defaultPayload, containerLauncherDescriptors, isLocal); addIfService(containerLauncherRouter, true); dispatcher.register(NMCommunicatorEventType.class, containerLauncherRouter); @@ -1071,10 +1078,11 @@ public class DAGAppMaster extends AbstractService { TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, List<NamedEntityDescriptor> entityDescriptors, + UserPayload defaultUserPayload, boolean isLocal) { TaskAttemptListener lis = new TaskAttemptListenerImpTezDag(context, thh, chh, - entityDescriptors, amConf, isLocal); + entityDescriptors, defaultUserPayload, isLocal); return lis; } @@ -1095,11 +1103,11 @@ public class DAGAppMaster extends AbstractService { return chh; } - protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf, + protected ContainerLauncherRouter createContainerLauncherRouter(UserPayload defaultPayload, List<NamedEntityDescriptor> containerLauncherDescriptors, boolean isLocal) throws UnknownHostException { - return new ContainerLauncherRouter(conf, context, taskAttemptListener, workingDirectory, + return new ContainerLauncherRouter(defaultPayload, context, taskAttemptListener, workingDirectory, containerLauncherDescriptors, isLocal); } http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index 1e34184..cc109a6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -29,9 +29,9 @@ import java.util.concurrent.ConcurrentMap; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.apache.commons.collections4.ListUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; @@ -103,7 +103,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements public TaskAttemptListenerImpTezDag(AppContext context, TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, List<NamedEntityDescriptor> taskCommunicatorDescriptors, - Configuration conf, + UserPayload defaultUserPayload, boolean isPureLocalMode) { super(TaskAttemptListenerImpTezDag.class.getName()); this.context = context; @@ -112,17 +112,26 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements if (taskCommunicatorDescriptors == null || taskCommunicatorDescriptors.isEmpty()) { if (isPureLocalMode) { taskCommunicatorDescriptors = Lists.newArrayList(new NamedEntityDescriptor( - TezConstants.getTezUberServicePluginName(), null)); + TezConstants.getTezUberServicePluginName(), null).setUserPayload(defaultUserPayload)); } else { taskCommunicatorDescriptors = Lists.newArrayList(new NamedEntityDescriptor( - TezConstants.getTezYarnServicePluginName(), null)); + TezConstants.getTezYarnServicePluginName(), null).setUserPayload(defaultUserPayload)); } } this.taskCommunicators = new TaskCommunicator[taskCommunicatorDescriptors.size()]; this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorDescriptors.size()]; this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorDescriptors.size()]; for (int i = 0 ; i < taskCommunicatorDescriptors.size() ; i++) { - taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, conf, i); + UserPayload userPayload; + if (taskCommunicatorDescriptors.get(i).getEntityName() + .equals(TezConstants.getTezYarnServicePluginName()) || + taskCommunicatorDescriptors.get(i).getEntityName() + .equals(TezConstants.getTezUberServicePluginName())) { + userPayload = defaultUserPayload; + } else { + userPayload = taskCommunicatorDescriptors.get(i).getUserPayload(); + } + taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, userPayload, i); taskCommunicators[i] = createTaskCommunicator(taskCommunicatorDescriptors.get(i), i); taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskCommunicators[i]); } http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java index 035db93..cc315b7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java @@ -23,10 +23,10 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.api.TaskHeartbeatRequest; @@ -49,17 +49,17 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver private final int taskCommunicatorIndex; private final ReentrantReadWriteLock.ReadLock dagChangedReadLock; private final ReentrantReadWriteLock.WriteLock dagChangedWriteLock; - private final Configuration conf; + private final UserPayload userPayload; private DAG dag; public TaskCommunicatorContextImpl(AppContext appContext, TaskAttemptListenerImpTezDag taskAttemptListener, - Configuration conf, + UserPayload userPayload, int taskCommunicatorIndex) { this.context = appContext; this.taskAttemptListener = taskAttemptListener; - this.conf = conf; + this.userPayload = userPayload; this.taskCommunicatorIndex = taskCommunicatorIndex; ReentrantReadWriteLock dagChangedLock = new ReentrantReadWriteLock(); @@ -68,8 +68,8 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver } @Override - public Configuration getInitialConfiguration() { - return conf; + public UserPayload getInitialUserPayload() { + return userPayload; } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index 93b5b43..2a5c80e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -77,6 +77,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { protected final String tokenIdentifier; protected final Token<JobTokenIdentifier> sessionToken; + protected final Configuration conf; protected InetSocketAddress address; protected volatile Server server; @@ -119,6 +120,12 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { this.taskUmbilical = new TezTaskUmbilicalProtocolImpl(); this.tokenIdentifier = taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString(); this.sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials()); + try { + conf = TezUtils.createConfFromUserPayload(getContext().getInitialUserPayload()); + } catch (IOException e) { + throw new TezUncheckedException( + "Unable to parse user payload for " + TezTaskCommunicatorImpl.class.getSimpleName(), e); + } } @Override @@ -132,7 +139,6 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { } protected void startRpcServer() { - Configuration conf = getContext().getInitialConfiguration(); try { JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(); @@ -171,6 +177,10 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { } } + protected Configuration getConf() { + return this.conf; + } + private void refreshServiceAcls(Configuration configuration, PolicyProvider policyProvider) { this.server.refreshServiceAcl(configuration, policyProvider); http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java index cba5c80..07d269d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; @@ -224,7 +225,12 @@ public class ContainerLauncherImpl extends ContainerLauncher { public ContainerLauncherImpl(ContainerLauncherContext containerLauncherContext) { super(containerLauncherContext); - this.conf = new Configuration(containerLauncherContext.getInitialConfiguration()); + try { + this.conf = TezUtils.createConfFromUserPayload(containerLauncherContext.getInitialUserPayload()); + } catch (IOException e) { + throw new TezUncheckedException( + "Failed to parse user payload for " + ContainerLauncherImpl.class.getSimpleName(), e); + } conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0); http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java index 594e6d3..2d56bfe 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; @@ -63,7 +64,7 @@ public class ContainerLauncherRouter extends AbstractService } // Accepting conf to setup final parameters, if required. - public ContainerLauncherRouter(Configuration conf, AppContext context, + public ContainerLauncherRouter(UserPayload defaultUserPayload, AppContext context, TaskAttemptListener taskAttemptListener, String workingDirectory, List<NamedEntityDescriptor> containerLauncherDescriptors, @@ -74,10 +75,10 @@ public class ContainerLauncherRouter extends AbstractService if (containerLauncherDescriptors == null || containerLauncherDescriptors.isEmpty()) { if (isPureLocalMode) { containerLauncherDescriptors = Lists.newArrayList(new NamedEntityDescriptor( - TezConstants.getTezUberServicePluginName(), null)); + TezConstants.getTezUberServicePluginName(), null).setUserPayload(defaultUserPayload)); } else { containerLauncherDescriptors = Lists.newArrayList(new NamedEntityDescriptor( - TezConstants.getTezYarnServicePluginName(), null)); + TezConstants.getTezYarnServicePluginName(), null).setUserPayload(defaultUserPayload)); } } containerLauncherContexts = new ContainerLauncherContext[containerLauncherDescriptors.size()]; @@ -86,10 +87,20 @@ public class ContainerLauncherRouter extends AbstractService for (int i = 0; i < containerLauncherDescriptors.size(); i++) { - ContainerLauncherContext containerLauncherContext = new ContainerLauncherContextImpl(context, taskAttemptListener); + UserPayload userPayload; + if (containerLauncherDescriptors.get(i).getEntityName() + .equals(TezConstants.getTezYarnServicePluginName()) || + containerLauncherDescriptors.get(i).getEntityName() + .equals(TezConstants.getTezUberServicePluginName())) { + userPayload = defaultUserPayload; + } else { + userPayload = containerLauncherDescriptors.get(i).getUserPayload(); + } + ContainerLauncherContext containerLauncherContext = + new ContainerLauncherContextImpl(context, taskAttemptListener, userPayload); containerLauncherContexts[i] = containerLauncherContext; containerLaunchers[i] = createContainerLauncher(containerLauncherDescriptors.get(i), context, - containerLauncherContext, taskAttemptListener, workingDirectory, isPureLocalMode, conf); + containerLauncherContext, taskAttemptListener, workingDirectory, isPureLocalMode); containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService(containerLaunchers[i]); } } @@ -99,8 +110,7 @@ public class ContainerLauncherRouter extends AbstractService ContainerLauncherContext containerLauncherContext, TaskAttemptListener taskAttemptListener, String workingDirectory, - boolean isPureLocalMode, - Configuration conf) throws + boolean isPureLocalMode) throws UnknownHostException { if (containerLauncherDescriptor.getEntityName().equals( TezConstants.getTezYarnServicePluginName())) { http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index 3975111..1d3e6df 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -44,6 +44,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.tez.common.TezUtils; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; @@ -129,7 +130,14 @@ public class LocalContainerLauncher extends ContainerLauncher { System.getenv(Environment.NM_HOST.name()); executionContext = new ExecutionContextImpl(host); - numExecutors = getContext().getInitialConfiguration().getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, + Configuration conf; + try { + conf = TezUtils.createConfFromUserPayload(getContext().getInitialUserPayload()); + } catch (IOException e) { + throw new TezUncheckedException( + "Failed to parse user payload for " + LocalContainerLauncher.class.getSimpleName(), e); + } + numExecutors = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT); Preconditions.checkState(numExecutors >=1, "Must have at least 1 executor"); ExecutorService rawExecutor = Executors.newFixedThreadPool(numExecutors, http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java index 1d889ae..395589c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java @@ -18,6 +18,7 @@ package org.apache.tez.dag.app.rm; +import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; @@ -27,6 +28,7 @@ import java.util.LinkedHashMap; import com.google.common.primitives.Ints; +import org.apache.tez.common.TezUtils; import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.slf4j.Logger; @@ -65,7 +67,13 @@ public class LocalTaskSchedulerService extends TaskScheduler { this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl(); this.containerSignatureMatcher = taskSchedulerContext.getContainerSignatureMatcher(); this.customContainerAppId = taskSchedulerContext.getCustomClusterIdentifier(); - this.conf = taskSchedulerContext.getInitialConfiguration(); + try { + this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload()); + } catch (IOException e) { + throw new TezUncheckedException( + "Failed to deserialize payload for " + LocalTaskSchedulerService.class.getSimpleName(), + e); + } } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java index 890870e..7f1d5a3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java @@ -18,7 +18,6 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Map; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; @@ -28,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.common.ContainerSignatureMatcher; import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.AppContext; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; @@ -40,12 +40,12 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext { private final long customClusterIdentifier; private final String appHostName; private final int clientPort; - private final Configuration conf; + private final UserPayload initialUserPayload; public TaskSchedulerContextImpl(TaskSchedulerEventHandler tseh, AppContext appContext, int schedulerId, String trackingUrl, long customClusterIdentifier, String appHostname, int clientPort, - Configuration conf) { + UserPayload initialUserPayload) { this.tseh = tseh; this.appContext = appContext; this.schedulerId = schedulerId; @@ -53,7 +53,7 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext { this.customClusterIdentifier = customClusterIdentifier; this.appHostName = appHostname; this.clientPort = clientPort; - this.conf = conf; + this.initialUserPayload = initialUserPayload; } @@ -110,8 +110,8 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext { } @Override - public Configuration getInitialConfiguration() { - return conf; + public UserPayload getInitialUserPayload() { + return initialUserPayload; } http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java index e64ef43..9e4c8e0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java @@ -27,7 +27,6 @@ import java.util.concurrent.Future; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; @@ -37,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.common.ContainerSignatureMatcher; import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; /** @@ -132,8 +132,8 @@ class TaskSchedulerContextImplWrapper implements TaskSchedulerContext { // does not use locks. @Override - public Configuration getInitialConfiguration() { - return real.getInitialConfiguration(); + public UserPayload getInitialUserPayload() { + return real.getInitialUserPayload(); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index 8038e2c..4899f82 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; @@ -114,7 +115,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements private AtomicBoolean shouldUnregisterFlag = new AtomicBoolean(false); private final WebUIService webUI; - private final String[] taskSchedulerClasses; + private final NamedEntityDescriptor[] taskSchedulerDescriptors; protected final TaskScheduler[]taskSchedulers; protected final ServicePluginLifecycleAbstractService []taskSchedulerServiceWrappers; @@ -152,7 +153,8 @@ public class TaskSchedulerEventHandler extends AbstractService implements public TaskSchedulerEventHandler(AppContext appContext, DAGClientServer clientService, EventHandler eventHandler, ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI, - List<NamedEntityDescriptor> schedulerDescriptors, boolean isPureLocalMode) { + List<NamedEntityDescriptor> schedulerDescriptors, UserPayload defaultPayload, + boolean isPureLocalMode) { super(TaskSchedulerEventHandler.class.getName()); this.appContext = appContext; this.eventHandler = eventHandler; @@ -168,39 +170,50 @@ public class TaskSchedulerEventHandler extends AbstractService implements // Override everything for pure local mode if (isPureLocalMode) { - this.taskSchedulerClasses = new String[] {TezConstants.getTezUberServicePluginName()}; + this.taskSchedulerDescriptors = new NamedEntityDescriptor[]{ + new NamedEntityDescriptor(TezConstants.getTezUberServicePluginName(), null) + .setUserPayload(defaultPayload)}; this.yarnTaskSchedulerIndex = -1; } else { if (schedulerDescriptors == null || schedulerDescriptors.isEmpty()) { - this.taskSchedulerClasses = new String[] {TezConstants.getTezYarnServicePluginName()}; + this.taskSchedulerDescriptors = new NamedEntityDescriptor[]{ + new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null) + .setUserPayload(defaultPayload)}; this.yarnTaskSchedulerIndex = 0; } else { // Ensure the YarnScheduler will be setup and note it's index. This will be responsible for heartbeats and YARN registration. int foundYarnTaskSchedulerIndex = -1; - List<String> taskSchedulerClassList = new LinkedList<>(); + List<NamedEntityDescriptor> schedulerDescriptorList = new LinkedList<>(); for (int i = 0 ; i < schedulerDescriptors.size() ; i++) { if (schedulerDescriptors.get(i).getEntityName().equals( TezConstants.getTezYarnServicePluginName())) { - taskSchedulerClassList.add(schedulerDescriptors.get(i).getEntityName()); + schedulerDescriptorList.add( + new NamedEntityDescriptor(schedulerDescriptors.get(i).getEntityName(), null) + .setUserPayload( + defaultPayload)); foundYarnTaskSchedulerIndex = i; } else if (schedulerDescriptors.get(i).getEntityName().equals( TezConstants.getTezUberServicePluginName())) { - taskSchedulerClassList.add(schedulerDescriptors.get(i).getEntityName()); + schedulerDescriptorList.add( + new NamedEntityDescriptor(schedulerDescriptors.get(i).getEntityName(), null) + .setUserPayload( + defaultPayload)); } else { - taskSchedulerClassList.add(schedulerDescriptors.get(i).getClassName()); + schedulerDescriptorList.add(schedulerDescriptors.get(i)); } } if (foundYarnTaskSchedulerIndex == -1) { - taskSchedulerClassList.add(YarnTaskSchedulerService.class.getName()); - foundYarnTaskSchedulerIndex = taskSchedulerClassList.size() -1; + schedulerDescriptorList.add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null).setUserPayload( + defaultPayload)); + foundYarnTaskSchedulerIndex = schedulerDescriptorList.size() -1; } - this.taskSchedulerClasses = taskSchedulerClassList.toArray(new String[taskSchedulerClassList.size()]); + this.taskSchedulerDescriptors = schedulerDescriptorList.toArray(new NamedEntityDescriptor[schedulerDescriptorList.size()]); this.yarnTaskSchedulerIndex = foundYarnTaskSchedulerIndex; } } - taskSchedulers = new TaskScheduler[this.taskSchedulerClasses.length]; - taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[this.taskSchedulerClasses.length]; + taskSchedulers = new TaskScheduler[this.taskSchedulerDescriptors.length]; + taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[this.taskSchedulerDescriptors.length]; } public Map<ApplicationAccessType, String> getApplicationAcls() { @@ -417,23 +430,24 @@ public class TaskSchedulerEventHandler extends AbstractService implements private TaskScheduler createTaskScheduler(String host, int port, String trackingUrl, AppContext appContext, - String schedulerClassName, + NamedEntityDescriptor taskSchedulerDescriptor, long customAppIdIdentifier, int schedulerId) { TaskSchedulerContext rawContext = new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl, - customAppIdIdentifier, host, port, getConfig()); + customAppIdIdentifier, host, port, taskSchedulerDescriptor.getUserPayload()); TaskSchedulerContext wrappedContext = new TaskSchedulerContextImplWrapper(rawContext, appCallbackExecutor); - if (schedulerClassName.equals(TezConstants.getTezYarnServicePluginName())) { + String schedulerName = taskSchedulerDescriptor.getEntityName(); + if (schedulerName.equals(TezConstants.getTezYarnServicePluginName())) { LOG.info("Creating TaskScheduler: YarnTaskSchedulerService"); return new YarnTaskSchedulerService(wrappedContext); - } else if (schedulerClassName.equals(TezConstants.getTezUberServicePluginName())) { + } else if (schedulerName.equals(TezConstants.getTezUberServicePluginName())) { LOG.info("Creating TaskScheduler: Local TaskScheduler"); return new LocalTaskSchedulerService(wrappedContext); } else { - LOG.info("Creating custom TaskScheduler: " + schedulerClassName); + LOG.info("Creating custom TaskScheduler {}:{}", taskSchedulerDescriptor.getEntityName(), taskSchedulerDescriptor.getClassName()); Class<? extends TaskScheduler> taskSchedulerClazz = - (Class<? extends TaskScheduler>) ReflectionUtils.getClazz(schedulerClassName); + (Class<? extends TaskScheduler>) ReflectionUtils.getClazz(taskSchedulerDescriptor.getClassName()); try { Constructor<? extends TaskScheduler> ctor = taskSchedulerClazz .getConstructor(TaskSchedulerContext.class); @@ -453,21 +467,20 @@ public class TaskSchedulerEventHandler extends AbstractService implements @VisibleForTesting protected void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) { - // TODO Add error checking for components being used in the Vertex when running in pure local mode. // Iterate over the list and create all the taskSchedulers int j = 0; - for (int i = 0; i < taskSchedulerClasses.length; i++) { + for (int i = 0; i < taskSchedulerDescriptors.length; i++) { long customAppIdIdentifier; - if (isPureLocalMode || taskSchedulerClasses[i].equals( + if (isPureLocalMode || taskSchedulerDescriptors[i].equals( TezConstants.getTezYarnServicePluginName())) { // Use the app identifier from the appId. customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp(); } else { customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT); } - LOG.info("ClusterIdentifier for TaskScheduler [" + i + ":" + taskSchedulerClasses[i] + "]=" + + LOG.info("ClusterIdentifier for TaskScheduler [" + i + ":" + taskSchedulerDescriptors[i].getEntityName() + "]=" + customAppIdIdentifier); taskSchedulers[i] = createTaskScheduler(host, port, - trackingUrl, appContext, taskSchedulerClasses[i], customAppIdIdentifier, i); + trackingUrl, appContext, taskSchedulerDescriptors[i], customAppIdIdentifier, i); taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskSchedulers[i]); } } http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index 08821b0..b4d1f26 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -31,13 +31,11 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.tez.common.TezUtils; import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AMState; @@ -72,7 +70,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ThreadFactoryBuilder; /* TODO not yet updating cluster nodes on every allocate response * from RMContainerRequestor @@ -220,7 +217,13 @@ public class YarnTaskSchedulerService extends TaskScheduler this.appHostName = taskSchedulerContext.getAppHostName(); this.appHostPort = taskSchedulerContext.getAppClientPort(); this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl(); - this.conf = taskSchedulerContext.getInitialConfiguration(); + try { + this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload()); + } catch (IOException e) { + throw new TezUncheckedException( + "Failed to deserialize payload for " + YarnTaskSchedulerService.class.getSimpleName(), + e); + } } @Private @@ -233,7 +236,13 @@ public class YarnTaskSchedulerService extends TaskScheduler this.appHostName = taskSchedulerContext.getAppHostName(); this.appHostPort = taskSchedulerContext.getAppClientPort(); this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl(); - this.conf = taskSchedulerContext.getInitialConfiguration(); + try { + this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload()); + } catch (IOException e) { + throw new TezUncheckedException( + "Failed to deserialize payload for " + YarnTaskSchedulerService.class.getSimpleName(), + e); + } } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index 17feeaa..0723dbc 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -18,6 +18,7 @@ package org.apache.tez.dag.app; +import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.ThreadMXBean; import java.net.UnknownHostException; @@ -34,7 +35,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; @@ -488,7 +491,15 @@ public class MockDAGAppMaster extends DAGAppMaster { super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime, isSession, workingDirectory, localDirs, logDirs, new TezApiVersionInfo().getVersion(), 1, credentials, jobUserName, null); - containerLauncherContext = new ContainerLauncherContextImpl(getContext(), getTaskAttemptListener()); + Configuration conf = new Configuration(false); + UserPayload userPayload; + try { + userPayload = TezUtils.createUserPayloadFromConf(conf); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + containerLauncherContext = + new ContainerLauncherContextImpl(getContext(), getTaskAttemptListener(), userPayload); containerLauncher = new MockContainerLauncher(launcherGoFlag, containerLauncherContext); shutdownHandler = new MockDAGAppMasterShutdownHandler(); this.initFailFlag = initFailFlag; @@ -500,7 +511,7 @@ public class MockDAGAppMaster extends DAGAppMaster { // use mock container launcher for tests @Override - protected ContainerLauncherRouter createContainerLauncherRouter(final Configuration conf, + protected ContainerLauncherRouter createContainerLauncherRouter(final UserPayload defaultUserPayload, List<NamedEntityDescriptor> containerLauncherDescirptors, boolean isLocal) throws UnknownHostException { http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java index e45b0a2..1cb69a8 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java @@ -47,11 +47,14 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ContainerContext; import org.apache.tez.common.ContainerTask; +import org.apache.tez.common.TezUtils; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskHeartbeatRequest; @@ -135,8 +138,15 @@ public class TestTaskAttemptListenerImplTezDag { doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class)); doReturn(container).when(amContainer).getContainer(); + Configuration conf = new TezConfiguration(); + UserPayload defaultPayload; + try { + defaultPayload = TezUtils.createUserPayloadFromConf(conf); + } catch (IOException e) { + throw new TezUncheckedException(e); + } taskAttemptListener = new TaskAttemptListenerImplForTest(appContext, - mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, null, false); + mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, defaultPayload, false); TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(); TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical(); @@ -299,8 +309,14 @@ public class TestTaskAttemptListenerImplTezDag { new JobTokenSecretManager()); sessionToken.setService(identifier.getJobId()); TokenCache.setSessionToken(sessionToken, credentials); + UserPayload userPayload = null; + try { + userPayload = TezUtils.createUserPayloadFromConf(conf); + } catch (IOException e) { + throw new TezUncheckedException(e); + } taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext, - mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, conf, false); + mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, userPayload, false); // no exception happen, should started properly taskAttemptListener.init(conf); taskAttemptListener.start(); @@ -319,8 +335,14 @@ public class TestTaskAttemptListenerImplTezDag { TokenCache.setSessionToken(sessionToken, credentials); conf.set(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, port + "-" + port); + UserPayload userPayload = null; + try { + userPayload = TezUtils.createUserPayloadFromConf(conf); + } catch (IOException e) { + throw new TezUncheckedException(e); + } taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext, - mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, conf, false); + mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, userPayload, false); taskAttemptListener.init(conf); taskAttemptListener.start(); int resultedPort = taskAttemptListener.getTaskCommunicator(0).getAddress().getPort(); @@ -377,9 +399,9 @@ public class TestTaskAttemptListenerImplTezDag { TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, List<NamedEntityDescriptor> taskCommDescriptors, - Configuration conf, + UserPayload userPayload, boolean isPureLocalMode) { - super(context, thh, chh, taskCommDescriptors, conf, + super(context, thh, chh, taskCommDescriptors, userPayload, isPureLocalMode); } http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java index 8d776fb..1c82bd8 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java @@ -22,9 +22,11 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.io.IOException; import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -34,6 +36,10 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; @@ -51,7 +57,7 @@ import org.mockito.ArgumentCaptor; public class TestTaskAttemptListenerImplTezDag2 { @Test(timeout = 5000) - public void testTaskAttemptFailedKilled() { + public void testTaskAttemptFailedKilled() throws IOException { ApplicationId appId = ApplicationId.newInstance(1000, 1); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); Credentials credentials = new Credentials(); @@ -73,9 +79,11 @@ public class TestTaskAttemptListenerImplTezDag2 { doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class)); doReturn(container).when(amContainer).getContainer(); + Configuration conf = new TezConfiguration(); + UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); TaskAttemptListenerImpTezDag taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext, mock(TaskHeartbeatHandler.class), - mock(ContainerHeartbeatHandler.class), null, null, false); + mock(ContainerHeartbeatHandler.class), null, userPayload, false); TaskSpec taskSpec1 = mock(TaskSpec.class); TezTaskAttemptID taskAttemptId1 = mock(TezTaskAttemptID.class); http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index 88f6066..8e8224a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -37,6 +37,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.tez.common.TezUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -144,7 +145,7 @@ public class TestContainerReuse { TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest( appContext, eventHandler, rmClient, - new AlwaysMatchesContainerMatcher()); + new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(conf)); TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal); taskSchedulerEventHandler.init(conf); @@ -279,7 +280,7 @@ public class TestContainerReuse { TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, - new AlwaysMatchesContainerMatcher()); + new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(conf)); TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal); taskSchedulerEventHandler.init(conf); @@ -378,7 +379,7 @@ public class TestContainerReuse { doReturn(dagID).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); - TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher()); + TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal); taskSchedulerEventHandler.init(tezConf); taskSchedulerEventHandler.start(); @@ -514,7 +515,7 @@ public class TestContainerReuse { //Use ContainerContextMatcher here. Otherwise it would not match the JVM options TaskSchedulerEventHandler taskSchedulerEventHandlerReal = - new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new ContainerContextMatcher()); + new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new ContainerContextMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal); taskSchedulerEventHandler.init(tezConf); taskSchedulerEventHandler.start(); @@ -709,7 +710,7 @@ public class TestContainerReuse { TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest( appContext, eventHandler, rmClient, - new AlwaysMatchesContainerMatcher()); + new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal); taskSchedulerEventHandler.init(tezConf); @@ -833,7 +834,7 @@ public class TestContainerReuse { TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, - new AlwaysMatchesContainerMatcher()); + new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal); taskSchedulerEventHandler.init(tezConf); @@ -947,7 +948,7 @@ public class TestContainerReuse { doAnswer(dagIDAnswer).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); - TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher()); + TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal); taskSchedulerEventHandler.init(tezConf); taskSchedulerEventHandler.start(); @@ -1105,7 +1106,7 @@ public class TestContainerReuse { TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, - new ContainerContextMatcher()); + new ContainerContextMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal); taskSchedulerEventHandler.init(tezConf); taskSchedulerEventHandler.start(); @@ -1259,7 +1260,7 @@ public class TestContainerReuse { TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, - new AlwaysMatchesContainerMatcher()); + new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal); taskSchedulerEventHandler.init(tezConf); taskSchedulerEventHandler.start(); http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java index 7b9ac4f..cf21a1d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java @@ -145,7 +145,8 @@ public class TestTaskScheduler { scheduler.initialize(); drainableAppCallback.drain(); - verify(mockRMClient).init(conf); + // Verifying the validity of the configuration via the interval only instead of making sure + // it's the same instance. verify(mockRMClient).setHeartbeatInterval(interval); RegisterApplicationMasterResponse mockRegResponse = http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java index f191175..f8aa1e2 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java @@ -49,9 +49,12 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ContainerSignatureMatcher; +import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.client.DAGClientServer; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; @@ -92,8 +95,10 @@ public class TestTaskSchedulerEventHandler { public MockTaskSchedulerEventHandler(AppContext appContext, DAGClientServer clientService, EventHandler eventHandler, - ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) { - super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new LinkedList<NamedEntityDescriptor>(), false); + ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI, + UserPayload defaultPayload) { + super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, + new LinkedList<NamedEntityDescriptor>(), defaultPayload, false); } @Override @@ -134,8 +139,15 @@ public class TestTaskSchedulerEventHandler { mockWebUIService = mock(WebUIService.class); when(mockAppContext.getAllContainers()).thenReturn(mockAMContainerMap); when(mockClientService.getBindAddress()).thenReturn(new InetSocketAddress(10000)); + Configuration conf = new Configuration(false); + UserPayload userPayload; + try { + userPayload = TezUtils.createUserPayloadFromConf(conf); + } catch (IOException e) { + throw new TezUncheckedException(e); + } schedulerHandler = new MockTaskSchedulerEventHandler( - mockAppContext, mockClientService, mockEventHandler, mockSigMatcher, mockWebUIService); + mockAppContext, mockClientService, mockEventHandler, mockSigMatcher, mockWebUIService, userPayload); } @Test(timeout = 5000) http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index 60d37e9..59ab00a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; import java.util.LinkedList; @@ -59,7 +60,10 @@ import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ContainerSignatureMatcher; +import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest; @@ -125,22 +129,26 @@ class TestTaskSchedulerHelpers { private TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync; private ContainerSignatureMatcher containerSignatureMatcher; + private UserPayload defaultPayload; @SuppressWarnings("rawtypes") public TaskSchedulerEventHandlerForTest(AppContext appContext, EventHandler eventHandler, TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync, - ContainerSignatureMatcher containerSignatureMatcher) { - super(appContext, null, eventHandler, containerSignatureMatcher, null, new LinkedList<NamedEntityDescriptor>(), false); + ContainerSignatureMatcher containerSignatureMatcher, + UserPayload defaultPayload) { + super(appContext, null, eventHandler, containerSignatureMatcher, null, + new LinkedList<NamedEntityDescriptor>(), defaultPayload, false); this.amrmClientAsync = amrmClientAsync; this.containerSignatureMatcher = containerSignatureMatcher; + this.defaultPayload = defaultPayload; } @Override public void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) { TaskSchedulerContext taskSchedulerContext = new TaskSchedulerContextImpl(this, appContext, 0, trackingUrl, 1000, host, port, - getConfig()); + defaultPayload); TaskSchedulerContextImplWrapper wrapper = new TaskSchedulerContextImplWrapper(taskSchedulerContext, new CountingExecutorService(appCallbackExecutor)); @@ -287,8 +295,8 @@ class TestTaskSchedulerHelpers { // Not incrementing invocations for methods which to not obtain locks, // and do not go via the executor service. @Override - public Configuration getInitialConfiguration() { - return real.getInitialConfiguration(); + public UserPayload getInitialUserPayload() { + return real.getInitialUserPayload(); } @Override @@ -523,7 +531,13 @@ class TestTaskSchedulerHelpers { when(mockContext.getAppTrackingUrl()).thenReturn(appUrl); when(mockContext.getAMState()).thenReturn(TaskSchedulerContext.AMState.RUNNING_APP); - when(mockContext.getInitialConfiguration()).thenReturn(conf); + UserPayload userPayload; + try { + userPayload = TezUtils.createUserPayloadFromConf(conf); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + when(mockContext.getInitialUserPayload()).thenReturn(userPayload); when(mockContext.isSession()).thenReturn(isSession); if (containerSignatureMatcher != null) { when(mockContext.getContainerSignatureMatcher()) http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java index 0002b42..f31a07b 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java @@ -19,10 +19,12 @@ import java.net.InetSocketAddress; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.common.TezUtils; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; @@ -49,17 +51,22 @@ public class TezTestServiceContainerLauncher extends ContainerLauncher { private final int servicePort; private final TezTestServiceCommunicator communicator; private final ApplicationAttemptId appAttemptId; - // private final TaskAttemptListener tal; + private final Configuration conf; // Configuration passed in here to set up final parameters public TezTestServiceContainerLauncher(ContainerLauncherContext containerLauncherContext) { super(containerLauncherContext); - int numThreads = getContext().getInitialConfiguration().getInt( + try { + conf = TezUtils.createConfFromUserPayload(getContext().getInitialUserPayload()); + } catch (IOException e) { + throw new RuntimeException(e); + } + int numThreads = conf.getInt( TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS, TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT); - this.servicePort = getContext().getInitialConfiguration().getInt( + this.servicePort = conf.getInt( TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1); Preconditions.checkArgument(servicePort > 0, TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be set"); @@ -70,7 +77,7 @@ public class TezTestServiceContainerLauncher extends ContainerLauncher { @Override public void start() { - communicator.init(getContext().getInitialConfiguration()); + communicator.init(conf); communicator.start(); } http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java index 7d209bc..0d87995 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java @@ -14,6 +14,7 @@ package org.apache.tez.dag.app.rm; +import java.io.IOException; import java.util.Arrays; import java.util.LinkedList; import java.util.List; @@ -32,6 +33,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.service.TezTestServiceConfConstants; import org.apache.tez.serviceplugins.api.TaskScheduler; @@ -74,7 +77,12 @@ public class TezTestServiceTaskSchedulerService extends TaskScheduler { this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(), taskSchedulerContext.getCustomClusterIdentifier()); - Configuration conf = taskSchedulerContext.getInitialConfiguration(); + Configuration conf = null; + try { + conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload()); + } catch (IOException e) { + throw new TezUncheckedException(e); + } this.memoryPerInstance = conf .getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, -1); Preconditions.checkArgument(memoryPerInstance > 0, http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java index 078ea79..ef8f9e4 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java @@ -76,7 +76,7 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl @Override public void initialize() throws Exception { super.initialize(); - this.communicator.init(getContext().getInitialConfiguration()); + this.communicator.init(getConf()); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java index 07dd363..2c52ae3 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java @@ -24,10 +24,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.tez.client.TezClient; +import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.Vertex.VertexExecutionContext; import org.apache.tez.dag.api.client.DAGClient; @@ -136,17 +138,22 @@ public class TestExternalTezServices { confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); confForJobs.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false); + UserPayload userPayload = TezUtils.createUserPayloadFromConf(confForJobs); + TaskSchedulerDescriptor[] taskSchedulerDescriptors = new TaskSchedulerDescriptor[]{ TaskSchedulerDescriptor - .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName())}; + .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName()) + .setUserPayload(userPayload)}; ContainerLauncherDescriptor[] containerLauncherDescriptors = new ContainerLauncherDescriptor[]{ ContainerLauncherDescriptor - .create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName())}; + .create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName()) + .setUserPayload(userPayload)}; TaskCommunicatorDescriptor[] taskCommunicatorDescriptors = new TaskCommunicatorDescriptor[]{ TaskCommunicatorDescriptor - .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName())}; + .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName()) + .setUserPayload(userPayload)}; ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true, true, taskSchedulerDescriptors, containerLauncherDescriptors, taskCommunicatorDescriptors);
