TEZ-2090. Add tests for jobs running in external services. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/36e7f854 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/36e7f854 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/36e7f854 Branch: refs/heads/TEZ-2003 Commit: 36e7f854b8c648abf7aff1b5dca806f0d72108c9 Parents: 7ab75d8 Author: Siddharth Seth <[email protected]> Authored: Fri Feb 13 17:24:05 2015 -0800 Committer: Siddharth Seth <[email protected]> Committed: Thu Aug 6 01:22:32 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + pom.xml | 6 + .../apache/tez/dag/api/TezConfiguration.java | 2 + .../apache/tez/dag/api/TaskCommunicator.java | 1 + .../tez/dag/api/TaskCommunicatorContext.java | 3 + .../tez/dag/app/TezTaskCommunicatorImpl.java | 42 +- .../dag/app/rm/TaskSchedulerEventHandler.java | 2 +- tez-ext-service-tests/pom.xml | 161 ++++ .../tez/dag/app/TezTestServiceCommunicator.java | 152 ++++ .../TezTestServiceContainerLauncher.java | 144 ++++ .../TezTestServiceNoOpContainerLauncher.java | 66 ++ .../rm/TezTestServiceTaskSchedulerService.java | 347 ++++++++ .../TezTestServiceTaskCommunicatorImpl.java | 182 ++++ .../org/apache/tez/service/ContainerRunner.java | 27 + .../tez/service/MiniTezTestServiceCluster.java | 163 ++++ .../service/TezTestServiceConfConstants.java | 41 + .../TezTestServiceProtocolBlockingPB.java | 22 + .../tez/service/impl/ContainerRunnerImpl.java | 512 +++++++++++ .../apache/tez/service/impl/TezTestService.java | 126 +++ .../impl/TezTestServiceProtocolClientImpl.java | 82 ++ .../impl/TezTestServiceProtocolServerImpl.java | 133 +++ .../tez/shufflehandler/FadvisedChunkedFile.java | 78 ++ .../tez/shufflehandler/FadvisedFileRegion.java | 160 ++++ .../apache/tez/shufflehandler/IndexCache.java | 199 +++++ .../tez/shufflehandler/ShuffleHandler.java | 840 +++++++++++++++++++ .../tez/tests/TestExternalTezServices.java | 183 ++++ .../org/apache/tez/util/ProtoConverters.java | 172 ++++ .../src/test/proto/TezDaemonProtocol.proto | 84 ++ .../src/test/resources/log4j.properties | 19 + .../org/apache/tez/runtime/task/TezChild.java | 2 +- .../apache/tez/runtime/task/TezTaskRunner.java | 2 +- 31 files changed, 3943 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index d7e4be5..975ce65 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -1,5 +1,6 @@ ALL CHANGES: TEZ-2019. Temporarily allow the scheduler and launcher to be specified via configuration. TEZ-2006. Task communication plane needs to be pluggable. + TEZ-2090. Add tests for jobs running in external services. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a0b82e4..2ddb113 100644 --- a/pom.xml +++ b/pom.xml @@ -175,6 +175,11 @@ <type>test-jar</type> </dependency> <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-ext-service-tests</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.pig</groupId> <artifactId>pig</artifactId> <version>${pig.version}</version> @@ -680,6 +685,7 @@ <module>tez-ui</module> <module>tez-plugins</module> <module>tez-tools</module> + <module>tez-ext-service-tests</module> <module>tez-dist</module> <module>docs</module> </modules> http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 5398a90..34fdb15 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1219,6 +1219,8 @@ public class TezConfiguration extends Configuration { public static final String TEZ_AM_CONTAINER_LAUNCHER_CLASS = TEZ_AM_PREFIX + "container-launcher.class"; @ConfigurationScope(Scope.VERTEX) public static final String TEZ_AM_TASK_SCHEDULER_CLASS = TEZ_AM_PREFIX + "task-scheduler.class"; + @ConfigurationScope(Scope.VERTEX) + public static final String TEZ_AM_TASK_COMMUNICATOR_CLASS = TEZ_AM_PREFIX + "task-communicator.class"; // TODO only validate property here, value can also be validated if necessary http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java index 97f9c16..c9f85e0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java @@ -14,6 +14,7 @@ package org.apache.tez.dag.api; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.Map; http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java index 9b2d889..41675fe 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java @@ -44,5 +44,8 @@ public interface TaskCommunicatorContext { // TODO TEZ-2003 Move to vertex, taskIndex, version void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId); + // TODO TEZ-2003 Add an API to register task failure - for example, a communication failure. + // This will have to take into consideration the TA_FAILED event + // TODO Eventually Add methods to report availability stats to the scheduler. } http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index e40f79c..77d2e39 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -74,16 +74,22 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { new ConcurrentHashMap<TaskAttempt, ContainerId>(); private final TezTaskUmbilicalProtocol taskUmbilical; + private final String tokenIdentifier; + private final Token<JobTokenIdentifier> sessionToken; private InetSocketAddress address; private Server server; - private static final class ContainerInfo { + public static final class ContainerInfo { - ContainerInfo(ContainerId containerId) { + ContainerInfo(ContainerId containerId, String host, int port) { this.containerId = containerId; + this.host = host; + this.port = port; } - ContainerId containerId; + final ContainerId containerId; + public final String host; + public final int port; TezHeartbeatResponse lastResponse = null; TaskSpec taskSpec = null; long lastRequestId = 0; @@ -110,6 +116,8 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { super(TezTaskCommunicatorImpl.class.getName()); this.taskCommunicatorContext = taskCommunicatorContext; this.taskUmbilical = new TezTaskUmbilicalProtocolImpl(); + this.tokenIdentifier = this.taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString(); + this.sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials()); } @@ -130,9 +138,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { try { JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(); - Token<JobTokenIdentifier> sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials()); - jobTokenSecretManager.addTokenForJob( - taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString(), sessionToken); + jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken); server = new RPC.Builder(conf) .setProtocol(TezTaskUmbilicalProtocol.class) @@ -184,7 +190,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { @Override public void registerRunningContainer(ContainerId containerId, String host, int port) { - ContainerInfo oldInfo = registeredContainers.putIfAbsent(containerId, new ContainerInfo(containerId)); + ContainerInfo oldInfo = registeredContainers.putIfAbsent(containerId, new ContainerInfo(containerId, host, port)); if (oldInfo != null) { throw new TezUncheckedException("Multiple registrations for containerId: " + containerId); } @@ -232,9 +238,9 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { ". Already registered to containerId: " + oldId); } } - } + @Override public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) { TaskAttempt taskAttempt = new TaskAttempt(taskAttemptID); @@ -260,6 +266,18 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { return address; } + protected String getTokenIdentifier() { + return tokenIdentifier; + } + + protected Token<JobTokenIdentifier> getSessionToken() { + return sessionToken; + } + + protected TaskCommunicatorContext getTaskCommunicatorContext() { + return taskCommunicatorContext; + } + public TezTaskUmbilicalProtocol getUmbilical() { return this.taskUmbilical; } @@ -473,4 +491,12 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { return "TaskAttempt{" + "taskAttemptId=" + taskAttemptId + '}'; } } + + protected ContainerInfo getContainerInfo(ContainerId containerId) { + return registeredContainers.get(containerId); + } + + protected ContainerId getContainerForAttempt(TezTaskAttemptID taskAttemptId) { + return attemptToContainerMap.get(new TaskAttempt(taskAttemptId)); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index 982e398..a77b53a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -350,7 +350,7 @@ public class TaskSchedulerEventHandler extends AbstractService try { Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class, - Integer.class, String.class, Configuration.class); + int.class, String.class, Configuration.class); ctor.setAccessible(true); TaskSchedulerService taskSchedulerService = ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig()); http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-ext-service-tests/pom.xml ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/pom.xml b/tez-ext-service-tests/pom.xml new file mode 100644 index 0000000..37f68b1 --- /dev/null +++ b/tez-ext-service-tests/pom.xml @@ -0,0 +1,161 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>tez</artifactId> + <groupId>org.apache.tez</groupId> + <version>0.7.0-SNAPSHOT</version> + </parent> + + <!-- TODO TEZ-2003 Merge this into the tez-tests module --> + <artifactId>tez-ext-service-tests</artifactId> + + <dependencies> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-runtime-internals</artifactId> + </dependency> + <dependency> + <!-- Required for the ShuffleHandler --> + <groupId>org.apache.tez</groupId> + <artifactId>tez-runtime-library</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-dag</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-tests</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-tests</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <!-- + Include all files in src/main/resources. By default, do not apply property + substitution (filtering=false), but do apply property substitution to + version-info.properties (filtering=true). This will substitute the + version information correctly, but prevent Maven from altering other files. + --> + <resources> + <resource> + <directory>${basedir}/src/main/resources</directory> + <excludes> + <exclude>tez-api-version-info.properties</exclude> + </excludes> + <filtering>false</filtering> + </resource> + <resource> + <directory>${basedir}/src/main/resources</directory> + <includes> + <include>tez-api-version-info.properties</include> + </includes> + <filtering>true</filtering> + </resource> + </resources> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-maven-plugins</artifactId> + <executions> + <execution> + <id>compile-protoc</id> + <phase>generate-sources</phase> + <goals> + <goal>protoc</goal> + </goals> + <configuration> + <protocVersion>${protobuf.version}</protocVersion> + <protocCommand>${protoc.path}</protocCommand> + <imports> + <param>${basedir}/src/test/proto</param> + <param>${basedir}/../tez-api/src/main/proto</param> + </imports> + <source> + <directory>${basedir}/src/test/proto</directory> + <includes> + <include>TezDaemonProtocol.proto</include> + </includes> + </source> + <output>${project.build.directory}/generated-test-sources/java</output> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java new file mode 100644 index 0000000..ac50878 --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java @@ -0,0 +1,152 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app; + + +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.Message; +import org.apache.hadoop.service.AbstractService; +import org.apache.tez.service.TezTestServiceProtocolBlockingPB; +import org.apache.tez.service.impl.TezTestServiceProtocolClientImpl; +import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto; +import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerResponseProto; +import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto; +import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkResponseProto; + +public class TezTestServiceCommunicator extends AbstractService { + + private final ConcurrentMap<String, TezTestServiceProtocolBlockingPB> hostProxies; + private final ListeningExecutorService executor; + + // TODO Convert this into a singleton + public TezTestServiceCommunicator(int numThreads) { + super(TezTestServiceCommunicator.class.getSimpleName()); + ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder().setNameFormat("TezTestServiceCommunicator #%2d").build()); + this.hostProxies = new ConcurrentHashMap<String, TezTestServiceProtocolBlockingPB>(); + executor = MoreExecutors.listeningDecorator(localExecutor); + } + + @Override + public void serviceStop() { + executor.shutdownNow(); + } + + + public void runContainer(RunContainerRequestProto request, String host, int port, + final ExecuteRequestCallback<RunContainerResponseProto> callback) { + ListenableFuture<RunContainerResponseProto> future = executor.submit(new RunContainerCallable(request, host, port)); + Futures.addCallback(future, new FutureCallback<RunContainerResponseProto>() { + @Override + public void onSuccess(RunContainerResponseProto result) { + callback.setResponse(result); + } + + @Override + public void onFailure(Throwable t) { + callback.indicateError(t); + } + }); + + } + + public void submitWork(SubmitWorkRequestProto request, String host, int port, + final ExecuteRequestCallback<SubmitWorkResponseProto> callback) { + ListenableFuture<SubmitWorkResponseProto> future = executor.submit(new SubmitWorkCallable(request, host, port)); + Futures.addCallback(future, new FutureCallback<SubmitWorkResponseProto>() { + @Override + public void onSuccess(SubmitWorkResponseProto result) { + callback.setResponse(result); + } + + @Override + public void onFailure(Throwable t) { + callback.indicateError(t); + } + }); + + } + + + private class RunContainerCallable implements Callable<RunContainerResponseProto> { + + final String hostname; + final int port; + final RunContainerRequestProto request; + + private RunContainerCallable(RunContainerRequestProto request, String hostname, int port) { + this.hostname = hostname; + this.port = port; + this.request = request; + } + + @Override + public RunContainerResponseProto call() throws Exception { + return getProxy(hostname, port).runContainer(null, request); + } + } + + private class SubmitWorkCallable implements Callable<SubmitWorkResponseProto> { + final String hostname; + final int port; + final SubmitWorkRequestProto request; + + private SubmitWorkCallable(SubmitWorkRequestProto request, String hostname, int port) { + this.hostname = hostname; + this.port = port; + this.request = request; + } + + @Override + public SubmitWorkResponseProto call() throws Exception { + return getProxy(hostname, port).submitWork(null, request); + } + } + + public interface ExecuteRequestCallback<T extends Message> { + void setResponse(T response); + void indicateError(Throwable t); + } + + private TezTestServiceProtocolBlockingPB getProxy(String hostname, int port) { + String hostId = getHostIdentifier(hostname, port); + + TezTestServiceProtocolBlockingPB proxy = hostProxies.get(hostId); + if (proxy == null) { + proxy = new TezTestServiceProtocolClientImpl(getConfig(), hostname, port); + TezTestServiceProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, proxy); + if (proxyOld != null) { + // TODO Shutdown the new proxy. + proxy = proxyOld; + } + } + return proxy; + } + + private String getHostIdentifier(String hostname, int port) { + return hostname + ":" + port; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java new file mode 100644 index 0000000..e83165b --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java @@ -0,0 +1,144 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.launcher; + +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TezTestServiceCommunicator; +import org.apache.tez.dag.app.rm.NMCommunicatorEvent; +import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; +import org.apache.tez.dag.app.rm.container.AMContainerEvent; +import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed; +import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched; +import org.apache.tez.dag.app.rm.container.AMContainerEventType; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.events.ContainerLaunchedEvent; +import org.apache.tez.service.TezTestServiceConfConstants; +import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos; +import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto; + +public class TezTestServiceContainerLauncher extends AbstractService implements ContainerLauncher { + + // TODO Support interruptability of tasks which haven't yet been launched. + + // TODO May need multiple connections per target machine, depending upon how synchronization is handled in the RPC layer + + static final Log LOG = LogFactory.getLog(TezTestServiceContainerLauncher.class); + + private final AppContext context; + private final String tokenIdentifier; + private final TaskAttemptListener tal; + private final int servicePort; + private final TezTestServiceCommunicator communicator; + private final Clock clock; + + + // Configuration passed in here to set up final parameters + public TezTestServiceContainerLauncher(AppContext appContext, Configuration conf, + TaskAttemptListener tal) { + super(TezTestServiceContainerLauncher.class.getName()); + this.clock = appContext.getClock(); + int numThreads = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS, + TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT); + + this.servicePort = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1); + Preconditions.checkArgument(servicePort > 0, + TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be set"); + this.communicator = new TezTestServiceCommunicator(numThreads); + this.context = appContext; + this.tokenIdentifier = context.getApplicationID().toString(); + this.tal = tal; + } + + @Override + public void serviceInit(Configuration conf) { + communicator.init(conf); + } + + @Override + public void serviceStart() { + communicator.start(); + } + + @Override + public void serviceStop() { + communicator.stop(); + } + + @Override + public void handle(NMCommunicatorEvent event) { + switch (event.getType()) { + case CONTAINER_LAUNCH_REQUEST: + final NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event; + RunContainerRequestProto runRequest = constructRunContainerRequest(launchEvent); + communicator.runContainer(runRequest, launchEvent.getNodeId().getHost(), + launchEvent.getNodeId().getPort(), + new TezTestServiceCommunicator.ExecuteRequestCallback<TezTestServiceProtocolProtos.RunContainerResponseProto>() { + @Override + public void setResponse(TezTestServiceProtocolProtos.RunContainerResponseProto response) { + LOG.info("Container: " + launchEvent.getContainerId() + " launch succeeded on host: " + launchEvent.getNodeId()); + context.getEventHandler().handle(new AMContainerEventLaunched(launchEvent.getContainerId())); + ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent( + launchEvent.getContainerId(), clock.getTime(), context.getApplicationAttemptId()); + context.getHistoryHandler().handle(new DAGHistoryEvent( + null, lEvt)); + } + + @Override + public void indicateError(Throwable t) { + LOG.error("Failed to launch container: " + launchEvent.getContainer() + " on host: " + launchEvent.getNodeId(), t); + sendContainerLaunchFailedMsg(launchEvent.getContainerId(), t); + } + }); + break; + case CONTAINER_STOP_REQUEST: + LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + event); + // that the container is actually done (normally received from RM) + // TODO Sending this out for an un-launched container is invalid + context.getEventHandler().handle(new AMContainerEvent(event.getContainerId(), + AMContainerEventType.C_NM_STOP_SENT)); + break; + } + } + + private RunContainerRequestProto constructRunContainerRequest(NMCommunicatorLaunchRequestEvent event) { + RunContainerRequestProto.Builder builder = RunContainerRequestProto.newBuilder(); + builder.setAmHost(tal.getAddress().getHostName()).setAmPort(tal.getAddress().getPort()); + builder.setAppAttemptNumber(event.getContainer().getId().getApplicationAttemptId().getAttemptId()); + builder.setApplicationIdString( + event.getContainer().getId().getApplicationAttemptId().getApplicationId().toString()); + builder.setTokenIdentifier(tokenIdentifier); + builder.setContainerIdString(event.getContainer().getId().toString()); + builder.setCredentialsBinary( + ByteString.copyFrom(event.getContainerLaunchContext().getTokens())); + // TODO Avoid reading this from the environment + builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name())); + return builder.build(); + } + + @SuppressWarnings("unchecked") + void sendContainerLaunchFailedMsg(ContainerId containerId, Throwable t) { + context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, t == null ? "" : t.getMessage())); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java new file mode 100644 index 0000000..8c8e486 --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java @@ -0,0 +1,66 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.launcher; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.rm.NMCommunicatorEvent; +import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; +import org.apache.tez.dag.app.rm.container.AMContainerEvent; +import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched; +import org.apache.tez.dag.app.rm.container.AMContainerEventType; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.events.ContainerLaunchedEvent; + +public class TezTestServiceNoOpContainerLauncher extends AbstractService implements ContainerLauncher { + + static final Log LOG = LogFactory.getLog(TezTestServiceNoOpContainerLauncher.class); + + private final AppContext context; + private final Clock clock; + + public TezTestServiceNoOpContainerLauncher(AppContext appContext, Configuration conf, + TaskAttemptListener tal) { + super(TezTestServiceNoOpContainerLauncher.class.getName()); + this.context = appContext; + this.clock = appContext.getClock(); + } + + @Override + public void handle(NMCommunicatorEvent event) { + switch(event.getType()) { + case CONTAINER_LAUNCH_REQUEST: + final NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event; + LOG.info("No-op launch for container: " + launchEvent.getContainerId() + " succeeded on host: " + launchEvent.getNodeId()); + context.getEventHandler().handle(new AMContainerEventLaunched(launchEvent.getContainerId())); + ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent( + launchEvent.getContainerId(), clock.getTime(), context.getApplicationAttemptId()); + context.getHistoryHandler().handle(new DAGHistoryEvent( + null, lEvt)); + break; + case CONTAINER_STOP_REQUEST: + LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + event); + context.getEventHandler().handle(new AMContainerEvent(event.getContainerId(), + AMContainerEventType.C_NM_STOP_SENT)); + break; + } + + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java new file mode 100644 index 0000000..e3c18bf --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java @@ -0,0 +1,347 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.rm; + +import java.io.IOException; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.service.TezTestServiceConfConstants; + + +// TODO Registration with RM - so that the AM is considered dead and restarted in the expiry interval - 10 minutes. + +public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { + + private static final Log LOG = LogFactory.getLog(TezTestServiceTaskSchedulerService.class); + + private final ExecutorService appCallbackExecutor; + private final TaskSchedulerAppCallback appClientDelegate; + private final AppContext appContext; + private final List<String> serviceHosts; + private final ContainerFactory containerFactory; + private final Random random = new Random(); + // Currently all services must be running on the same port. + private final int containerPort; + + private final String clientHostname; + private final int clientPort; + private final String trackingUrl; + private final AtomicBoolean isStopped = new AtomicBoolean(false); + private final ConcurrentMap<Object, ContainerId> runningTasks = + new ConcurrentHashMap<Object, ContainerId>(); + + private final AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient; + + // Per instance + private final int memoryPerInstance; + private final int coresPerInstance; + private final int executorsPerInstance; + + // Per Executor Thread + private final Resource resourcePerContainer; + + + public TezTestServiceTaskSchedulerService(TaskSchedulerAppCallback appClient, + AppContext appContext, + String clientHostname, int clientPort, + String trackingUrl, + Configuration conf) { + // Accepting configuration here to allow setting up fields as final + super(TezTestServiceTaskSchedulerService.class.getName()); + this.appCallbackExecutor = createAppCallbackExecutorService(); + this.appClientDelegate = createAppCallbackDelegate(appClient); + this.appContext = appContext; + this.serviceHosts = new LinkedList<String>(); + this.containerFactory = new ContainerFactory(appContext); + + this.memoryPerInstance = conf + .getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, -1); + Preconditions.checkArgument(memoryPerInstance > 0, + TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB + + " must be configured"); + + this.executorsPerInstance = conf.getInt( + TezTestServiceConfConstants.TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE, + -1); + Preconditions.checkArgument(executorsPerInstance > 0, + TezTestServiceConfConstants.TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE + + " must be configured"); + + this.coresPerInstance = conf + .getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_VCPUS_PER_INSTANCE, + executorsPerInstance); + + this.containerPort = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1); + Preconditions.checkArgument(executorsPerInstance > 0, + TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be configured"); + + this.clientHostname = clientHostname; + this.clientPort = clientPort; + this.trackingUrl = trackingUrl; + + int memoryPerContainer = (int) (memoryPerInstance / (float) executorsPerInstance); + int coresPerContainer = (int) (coresPerInstance / (float) executorsPerInstance); + this.resourcePerContainer = Resource.newInstance(memoryPerContainer, coresPerContainer); + this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(5000, new FakeAmRmCallbackHandler()); + + String[] hosts = conf.getTrimmedStrings(TezTestServiceConfConstants.TEZ_TEST_SERVICE_HOSTS); + if (hosts == null || hosts.length == 0) { + hosts = new String[]{"localhost"}; + } + for (String host : hosts) { + serviceHosts.add(host); + } + + LOG.info("Running with configuration: " + + "memoryPerInstance=" + memoryPerInstance + + ", vcoresPerInstance=" + coresPerInstance + + ", executorsPerInstance=" + executorsPerInstance + + ", resourcePerContainerInferred=" + resourcePerContainer + + ", hosts=" + serviceHosts.toString()); + + } + + @Override + public void serviceInit(Configuration conf) { + amRmClient.init(conf); + } + + @Override + public void serviceStart() { + amRmClient.start(); + RegisterApplicationMasterResponse response; + try { + amRmClient.registerApplicationMaster(clientHostname, clientPort, trackingUrl); + } catch (YarnException e) { + throw new TezUncheckedException(e); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + } + + @Override + public void serviceStop() { + if (!this.isStopped.getAndSet(true)) { + + try { + TaskSchedulerAppCallback.AppFinalStatus status = appClientDelegate.getFinalAppStatus(); + amRmClient.unregisterApplicationMaster(status.exitStatus, status.exitMessage, + status.postCompletionTrackingUrl); + } catch (YarnException e) { + throw new TezUncheckedException(e); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + appCallbackExecutor.shutdownNow(); + } + } + + @Override + public Resource getAvailableResources() { + // TODO This needs information about all running executors, and the amount of memory etc available across the cluster. + return Resource + .newInstance(Ints.checkedCast(serviceHosts.size() * memoryPerInstance), + serviceHosts.size() * coresPerInstance); + } + + @Override + public int getClusterNodeCount() { + return serviceHosts.size(); + } + + @Override + public void resetMatchLocalityForAllHeldContainers() { + } + + @Override + public Resource getTotalResources() { + return Resource + .newInstance(Ints.checkedCast(serviceHosts.size() * memoryPerInstance), + serviceHosts.size() * coresPerInstance); + } + + @Override + public void blacklistNode(NodeId nodeId) { + LOG.info("DEBUG: BlacklistNode not supported"); + } + + @Override + public void unblacklistNode(NodeId nodeId) { + LOG.info("DEBUG: unBlacklistNode not supported"); + } + + @Override + public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks, + Priority priority, Object containerSignature, Object clientCookie) { + String host = selectHost(hosts); + Container container = + containerFactory.createContainer(resourcePerContainer, priority, host, containerPort); + runningTasks.put(task, container.getId()); + appClientDelegate.taskAllocated(task, clientCookie, container); + } + + + @Override + public void allocateTask(Object task, Resource capability, ContainerId containerId, + Priority priority, Object containerSignature, Object clientCookie) { + String host = selectHost(null); + Container container = + containerFactory.createContainer(resourcePerContainer, priority, host, containerPort); + runningTasks.put(task, container.getId()); + appClientDelegate.taskAllocated(task, clientCookie, container); + } + + @Override + public boolean deallocateTask(Object task, boolean taskSucceeded) { + ContainerId containerId = runningTasks.remove(task); + if (containerId == null) { + LOG.error("Could not determine ContainerId for task: " + task + + " . Could have hit a race condition. Ignoring." + + " The query may hang since this \"unknown\" container is now taking up a slot permanently"); + return false; + } + appClientDelegate.containerBeingReleased(containerId); + return true; + } + + @Override + public Object deallocateContainer(ContainerId containerId) { + LOG.info("DEBUG: Ignoring deallocateContainer for containerId: " + containerId); + return null; + } + + @Override + public void setShouldUnregister() { + + } + + @Override + public boolean hasUnregistered() { + // Nothing to do. No registration involved. + return true; + } + + private ExecutorService createAppCallbackExecutorService() { + return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build()); + } + + private TaskSchedulerAppCallback createAppCallbackDelegate( + TaskSchedulerAppCallback realAppClient) { + return new TaskSchedulerAppCallbackWrapper(realAppClient, + appCallbackExecutor); + } + + private String selectHost(String[] requestedHosts) { + String host = null; + if (requestedHosts != null && requestedHosts.length > 0) { + Arrays.sort(requestedHosts); + host = requestedHosts[0]; + LOG.info("Selected host: " + host + " from requested hosts: " + Arrays.toString(requestedHosts)); + } else { + host = serviceHosts.get(random.nextInt(serviceHosts.size())); + LOG.info("Selected random host: " + host + " since the request contained no host information"); + } + return host; + } + + static class ContainerFactory { + final AppContext appContext; + AtomicInteger nextId; + + public ContainerFactory(AppContext appContext) { + this.appContext = appContext; + this.nextId = new AtomicInteger(2); + } + + public Container createContainer(Resource capability, Priority priority, String hostname, int port) { + ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId(); + ContainerId containerId = ContainerId.newInstance(appAttemptId, nextId.getAndIncrement()); + NodeId nodeId = NodeId.newInstance(hostname, port); + String nodeHttpAddress = "hostname:0"; + + Container container = Container.newInstance(containerId, + nodeId, + nodeHttpAddress, + capability, + priority, + null); + + return container; + } + } + + private static class FakeAmRmCallbackHandler implements AMRMClientAsync.CallbackHandler { + + @Override + public void onContainersCompleted(List<ContainerStatus> statuses) { + + } + + @Override + public void onContainersAllocated(List<Container> containers) { + + } + + @Override + public void onShutdownRequest() { + + } + + @Override + public void onNodesUpdated(List<NodeReport> updatedNodes) { + + } + + @Override + public float getProgress() { + return 0; + } + + @Override + public void onError(Throwable e) { + + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java new file mode 100644 index 0000000..78cdcde --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java @@ -0,0 +1,182 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.taskcomm; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.google.protobuf.ByteString; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.dag.app.TezTaskCommunicatorImpl; +import org.apache.tez.dag.app.TezTestServiceCommunicator; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto; +import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkResponseProto; +import org.apache.tez.util.ProtoConverters; + + +public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl { + + private static final Log LOG = LogFactory.getLog(TezTestServiceTaskCommunicatorImpl.class); + + private final TezTestServiceCommunicator communicator; + private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST; + private final ConcurrentMap<String, ByteBuffer> credentialMap; + + public TezTestServiceTaskCommunicatorImpl( + TaskCommunicatorContext taskCommunicatorContext) { + super(taskCommunicatorContext); + // TODO Maybe make this configurable + this.communicator = new TezTestServiceCommunicator(3); + + SubmitWorkRequestProto.Builder baseBuilder = SubmitWorkRequestProto.newBuilder(); + + // TODO Avoid reading this from the environment + baseBuilder.setUser(System.getenv(ApplicationConstants.Environment.USER.name())); + baseBuilder.setApplicationIdString( + taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString()); + baseBuilder + .setAppAttemptNumber(taskCommunicatorContext.getApplicationAttemptId().getAttemptId()); + baseBuilder.setTokenIdentifier(getTokenIdentifier()); + + BASE_SUBMIT_WORK_REQUEST = baseBuilder.build(); + + credentialMap = new ConcurrentHashMap<String, ByteBuffer>(); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + this.communicator.init(conf); + } + + @Override + public void serviceStart() { + super.serviceStart(); + this.communicator.start(); + } + + @Override + public void serviceStop() { + super.serviceStop(); + } + + + @Override + public void registerRunningContainer(ContainerId containerId, String hostname, int port) { + super.registerRunningContainer(containerId, hostname, port); + } + + @Override + public void registerContainerEnd(ContainerId containerId) { + super.registerContainerEnd(containerId); + } + + @Override + public void registerRunningTaskAttempt(final ContainerId containerId, final TaskSpec taskSpec, + Map<String, LocalResource> additionalResources, + Credentials credentials, + boolean credentialsChanged) { + super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, + credentialsChanged); + SubmitWorkRequestProto requestProto = null; + try { + requestProto = constructSubmitWorkRequest(containerId, taskSpec); + } catch (IOException e) { + throw new RuntimeException("Failed to construct request", e); + } + ContainerInfo containerInfo = getContainerInfo(containerId); + String host; + int port; + if (containerInfo != null) { + synchronized (containerInfo) { + host = containerInfo.host; + port = containerInfo.port; + } + } else { + // TODO Handle this properly + throw new RuntimeException("ContainerInfo not found for container: " + containerId + + ", while trying to launch task: " + taskSpec.getTaskAttemptID()); + } + communicator.submitWork(requestProto, host, port, + new TezTestServiceCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() { + @Override + public void setResponse(SubmitWorkResponseProto response) { + LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID()); + getTaskCommunicatorContext() + .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId); + } + + @Override + public void indicateError(Throwable t) { + // TODO Handle this error. This is where an API on the context to indicate failure / rejection comes in. + LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + + containerId, t); + } + }); + } + + @Override + public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) { + super.unregisterRunningTaskAttempt(taskAttemptID); + // Nothing else to do for now. The push API in the test does not support termination of a running task + } + + private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerId, + TaskSpec taskSpec) throws + IOException { + SubmitWorkRequestProto.Builder builder = + SubmitWorkRequestProto.newBuilder(BASE_SUBMIT_WORK_REQUEST); + builder.setContainerIdString(containerId.toString()); + builder.setAmHost(getAddress().getHostName()); + builder.setAmPort(getAddress().getPort()); + Credentials taskCredentials = new Credentials(); + // Credentials can change across DAGs. Ideally construct only once per DAG. + taskCredentials.addAll(getTaskCommunicatorContext().getCredentials()); + + ByteBuffer credentialsBinary = credentialMap.get(taskSpec.getDAGName()); + if (credentialsBinary == null) { + credentialsBinary = serializeCredentials(getTaskCommunicatorContext().getCredentials()); + credentialMap.putIfAbsent(taskSpec.getDAGName(), credentialsBinary.duplicate()); + } else { + credentialsBinary = credentialsBinary.duplicate(); + } + builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary)); + builder.setTaskSpec(ProtoConverters.convertTaskSpecToProto(taskSpec)); + return builder.build(); + } + + private ByteBuffer serializeCredentials(Credentials credentials) throws IOException { + Credentials containerCredentials = new Credentials(); + containerCredentials.addAll(credentials); + DataOutputBuffer containerTokens_dob = new DataOutputBuffer(); + containerCredentials.writeTokenStorageToStream(containerTokens_dob); + ByteBuffer containerCredentialsBuffer = ByteBuffer.wrap(containerTokens_dob.getData(), 0, + containerTokens_dob.getLength()); + return containerCredentialsBuffer; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java new file mode 100644 index 0000000..2bca4ed --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.service; + +import java.io.IOException; + +import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos; +import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto; +import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto; + +public interface ContainerRunner { + + void queueContainer(RunContainerRequestProto request) throws IOException; + void submitWork(SubmitWorkRequestProto request) throws IOException; +} http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java new file mode 100644 index 0000000..f47bd67 --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java @@ -0,0 +1,163 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.service; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.tez.service.impl.TezTestService; + +public class MiniTezTestServiceCluster extends AbstractService { + + private static final Log LOG = LogFactory.getLog(MiniTezTestServiceCluster.class); + + private final File testWorkDir; + private final long availableMemory; + private final int numExecutorsPerService; + private final String[] localDirs; + private final Configuration clusterSpecificConfiguration = new Configuration(false); + + private TezTestService tezTestService; + + public static MiniTezTestServiceCluster create(String clusterName, int numExecutorsPerService, long availableMemory, int numLocalDirs) { + return new MiniTezTestServiceCluster(clusterName, numExecutorsPerService, availableMemory, numLocalDirs); + } + + // TODO Add support for multiple instances + private MiniTezTestServiceCluster(String clusterName, int numExecutorsPerService, long availableMemory, int numLocalDirs) { + super(clusterName + "_TezTestServerCluster"); + Preconditions.checkArgument(numExecutorsPerService > 0); + Preconditions.checkArgument(availableMemory > 0); + Preconditions.checkArgument(numLocalDirs > 0); + String clusterNameTrimmed = clusterName.replace("$", "") + "_TezTestServerCluster"; + File targetWorkDir = new File("target", clusterNameTrimmed); + try { + FileContext.getLocalFSFileContext().delete( + new Path(targetWorkDir.getAbsolutePath()), true); + } catch (Exception e) { + LOG.warn("Could not cleanup test workDir: " + targetWorkDir, e); + throw new RuntimeException("Could not cleanup test workDir: " + targetWorkDir, e); + } + + if (Shell.WINDOWS) { + // The test working directory can exceed the maximum path length supported + // by some Windows APIs and cmd.exe (260 characters). To work around this, + // create a symlink in temporary storage with a much shorter path, + // targeting the full path to the test working directory. Then, use the + // symlink as the test working directory. + String targetPath = targetWorkDir.getAbsolutePath(); + File link = new File(System.getProperty("java.io.tmpdir"), + String.valueOf(System.currentTimeMillis())); + String linkPath = link.getAbsolutePath(); + + try { + FileContext.getLocalFSFileContext().delete(new Path(linkPath), true); + } catch (IOException e) { + throw new YarnRuntimeException("could not cleanup symlink: " + linkPath, e); + } + + // Guarantee target exists before creating symlink. + targetWorkDir.mkdirs(); + + Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor( + Shell.getSymlinkCommand(targetPath, linkPath)); + try { + shexec.execute(); + } catch (IOException e) { + throw new YarnRuntimeException(String.format( + "failed to create symlink from %s to %s, shell output: %s", linkPath, + targetPath, shexec.getOutput()), e); + } + + this.testWorkDir = link; + } else { + this.testWorkDir = targetWorkDir; + } + this.numExecutorsPerService = numExecutorsPerService; + this.availableMemory = availableMemory; + + // Setup Local Dirs + localDirs = new String[numLocalDirs]; + for (int i = 0 ; i < numLocalDirs ; i++) { + File f = new File(testWorkDir, "localDir"); + f.mkdirs(); + LOG.info("Created localDir: " + f.getAbsolutePath()); + localDirs[i] = f.getAbsolutePath(); + } + } + + @Override + public void serviceInit(Configuration conf) { + tezTestService = new TezTestService(conf, numExecutorsPerService, availableMemory, localDirs); + tezTestService.init(conf); + + } + + @Override + public void serviceStart() { + tezTestService.start(); + + clusterSpecificConfiguration.set(TezTestServiceConfConstants.TEZ_TEST_SERVICE_HOSTS, + getServiceAddress().getHostName()); + clusterSpecificConfiguration.setInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, + getServiceAddress().getPort()); + + clusterSpecificConfiguration.setInt( + TezTestServiceConfConstants.TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE, + numExecutorsPerService); + clusterSpecificConfiguration.setLong( + TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, availableMemory); + } + + @Override + public void serviceStop() { + tezTestService.stop(); + } + + /** + * return the address at which the service is listening + * @return host:port + */ + public InetSocketAddress getServiceAddress() { + Preconditions.checkState(getServiceState() == STATE.STARTED); + return tezTestService.getListenerAddress(); + } + + public int getShufflePort() { + Preconditions.checkState(getServiceState() == STATE.STARTED); + return tezTestService.getShufflePort(); + } + + public Configuration getClusterSpecificConfiguration() { + Preconditions.checkState(getServiceState() == STATE.STARTED); + return clusterSpecificConfiguration; + } + + // Mainly for verification + public int getNumSubmissions() { + return tezTestService.getNumSubmissions(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java new file mode 100644 index 0000000..bf4a5bd --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.service; + +public class TezTestServiceConfConstants { + + private static final String TEZ_TEST_SERVICE_PREFIX = "tez.test.service."; + + /** Number of executors per instance - used by the scheduler */ + public static final String TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE = TEZ_TEST_SERVICE_PREFIX + "num.executors.per-instance"; + + /** Memory available per instance - used by the scheduler */ + public static final String TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB = TEZ_TEST_SERVICE_PREFIX + "memory.per.instance.mb"; + + /** CPUs available per instance - used by the scheduler */ + public static final String TEZ_TEST_SERVICE_VCPUS_PER_INSTANCE = TEZ_TEST_SERVICE_PREFIX + "vcpus.per.instance"; + + + /** Hosts on which the service is running. Currently assuming a single port for all instances */ + public static final String TEZ_TEST_SERVICE_HOSTS = TEZ_TEST_SERVICE_PREFIX + "hosts"; + + /** Port on which the Service(s) listen. Current a single port for all instances */ + public static final String TEZ_TEST_SERVICE_RPC_PORT = TEZ_TEST_SERVICE_PREFIX + "rpc.port"; + + /** Number of threads to use in the AM to communicate with the external service */ + public static final String TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS = TEZ_TEST_SERVICE_PREFIX + "communicator.num.threads"; + public static final int TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT = 2; + +} http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java new file mode 100644 index 0000000..1108f72 --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java @@ -0,0 +1,22 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.service; + +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos; + +@ProtocolInfo(protocolName = "org.apache.tez.service.TezTestServiceProtocolBlockingPB", protocolVersion = 1) +public interface TezTestServiceProtocolBlockingPB extends TezTestServiceProtocolProtos.TezTestServiceProtocol.BlockingInterface { +} \ No newline at end of file
