Repository: tez Updated Branches: refs/heads/master 282917344 -> e49ed3397
TEZ-707. Add a LocalContainerLauncher. Contributed by Chen He. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e49ed339 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e49ed339 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e49ed339 Branch: refs/heads/master Commit: e49ed3397cefe80700be58c3739d3d8d98d40c04 Parents: 2829173 Author: Siddharth Seth <[email protected]> Authored: Tue Jul 29 22:14:42 2014 -0700 Committer: Siddharth Seth <[email protected]> Committed: Tue Jul 29 22:14:42 2014 -0700 ---------------------------------------------------------------------- .../org/apache/tez/dag/app/DAGAppMaster.java | 17 +- .../app/launcher/LocalContainerLauncher.java | 276 +++++++++++++++++++ .../org/apache/tez/runtime/task/TezChild.java | 83 ++++-- 3 files changed, 345 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e49ed339/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 4e55376..8f4bbab 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -45,6 +45,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; @@ -60,6 +61,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -130,6 +132,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventType; import org.apache.tez.dag.app.dag.impl.DAGImpl; import org.apache.tez.dag.app.launcher.ContainerLauncher; import org.apache.tez.dag.app.launcher.ContainerLauncherImpl; +import org.apache.tez.dag.app.launcher.LocalContainerLauncher; import org.apache.tez.dag.app.rm.AMSchedulerEventType; import org.apache.tez.dag.app.rm.NMCommunicatorEventType; import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler; @@ -218,6 +221,8 @@ public class DAGAppMaster extends AbstractService { private final Map<String, LocalResource> sessionResources = new HashMap<String, LocalResource>(); + private boolean isLocal = false; //Local mode flag + private DAGAppMasterShutdownHandler shutdownHandler = new DAGAppMasterShutdownHandler(); @@ -297,6 +302,12 @@ public class DAGAppMaster extends AbstractService { isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts; this.amConf = conf; + this.isLocal = conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, + TezConfiguration.TEZ_LOCAL_MODE_DEFAULT); + if (isLocal) { + UserGroupInformation.setConfiguration(conf); + appMasterUgi = UserGroupInformation.getCurrentUser(); + } conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); dispatcher = createDispatcher(); @@ -797,7 +808,11 @@ public class DAGAppMaster extends AbstractService { protected ContainerLauncher createContainerLauncher(final AppContext context) { - return new ContainerLauncherImpl(context); + if(isLocal){ + return new LocalContainerLauncher(context, taskAttemptListener); + } else { + return new ContainerLauncherImpl(context); + } } public ApplicationId getAppID() { http://git-wip-us.apache.org/repos/asf/tez/blob/e49ed339/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java new file mode 100644 index 0000000..9ff9a07 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.launcher; + + +import java.io.IOException; +import java.util.Random; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.util.Clock; + +import org.apache.tez.common.TezTaskUmbilicalProtocol; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.rm.NMCommunicatorEventType; +import org.apache.tez.dag.app.rm.NMCommunicatorEvent; +import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; +import org.apache.tez.dag.app.rm.container.AMContainerEvent; +import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed; +import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched; +import org.apache.tez.dag.app.rm.container.AMContainerEventStopFailed; +import org.apache.tez.dag.app.rm.container.AMContainerEventType; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.events.ContainerLaunchedEvent; +import org.apache.tez.runtime.task.TezChild; + + +/** + * Runs the container task locally in a thread. + * Since all (sub)tasks share the same local directory, they must be executed + * sequentially in order to avoid creating/deleting the same files/dirs. + */ +public class LocalContainerLauncher extends AbstractService implements + ContainerLauncher { + + private static final Log LOG = LogFactory.getLog(LocalContainerLauncher.class); + private final AppContext context; + private TaskAttemptListener taskAttemptListener; + private BlockingQueue<NMCommunicatorEvent> eventQueue = + new LinkedBlockingQueue<NMCommunicatorEvent>(); + private static AtomicBoolean serviceStopped; + + private Clock clock; + private LinkedBlockingQueue<Runnable> taskQueue = + new LinkedBlockingQueue<Runnable>(); + + private ThreadPoolExecutor taskExecutor; + private ListeningExecutorService listeningExecutorService; + private int poolSize; + private final Random sleepTime = new Random(); + + public LocalContainerLauncher(AppContext context, + TaskAttemptListener taskAttemptListener) { + super(LocalContainerLauncher.class.getName()); + this.context = context; + this.taskAttemptListener = taskAttemptListener; + } + + @Override + public void serviceStart() throws Exception { + Thread eventHandlingThread = new Thread(new TezSubTaskRunner(), + "LocalContainerLauncher-SubTaskRunner"); + eventHandlingThread.start(); + super.serviceStart(); + } + + @Override + public synchronized void serviceInit(Configuration config) { + serviceStopped = new AtomicBoolean(false); + this.clock = context.getClock(); + this.poolSize = config.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, + TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT); + int maxPoolSize = poolSize; + + this.taskExecutor = new ThreadPoolExecutor(poolSize, maxPoolSize, 60*1000, + TimeUnit.SECONDS, taskQueue); + this.listeningExecutorService = MoreExecutors.listeningDecorator(taskExecutor); + } + + @SuppressWarnings("unchecked") + void sendContainerLaunchFailedMsg(ContainerId containerId, String message) { + LOG.error(message); + context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, message)); + } + + //should mimic container using threads + //need to start all MapProcessor and RedProcessor here + private class TezSubTaskRunner implements Runnable { + + ListenableFuture<Object> runningTask; + TezSubTaskRunner() {} + + //launch tasks + private void launch(NMCommunicatorEvent event) { + NMCommunicatorLaunchRequestEvent launchEv = (NMCommunicatorLaunchRequestEvent)event; + + String containerIdStr = event.getContainerId().toString(); + String host = taskAttemptListener.getAddress().getAddress().getHostAddress(); + int port = taskAttemptListener.getAddress().getPort(); + String tokenIdentifier = context.getApplicationID().toString(); + + String[] localDirs = + StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())); + + try { + runningTask = listeningExecutorService.submit(createSubTask(context.getAMConf(), + host, port, containerIdStr, tokenIdentifier, context.getApplicationAttemptId().getAttemptId(), + localDirs, (TezTaskUmbilicalProtocol) taskAttemptListener)); + Futures.addCallback(runningTask, + new FutureCallback<Object>() { + @Override + public void onSuccess(Object result) { + } + + @Override + public void onFailure(Throwable t) { + LOG.error("Container launching failed", t); + } + } + , taskExecutor); + } catch (Throwable throwable) { + LOG.info("Failed to start runSubTask thread!", throwable); + sendContainerLaunchFailedMsg(event.getContainerId(), "Container Launching Failed!"); + } + + try{ + context.getEventHandler().handle( + new AMContainerEventLaunched(launchEv.getContainerId())); + ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent( + event.getContainerId(), clock.getTime(), context.getApplicationAttemptId()); + context.getHistoryHandler().handle(new DAGHistoryEvent(context.getCurrentDAGID(),lEvt)); + } catch (Throwable t) { + String message = "Container launch failed for " + event.getContainerId() + " : " + + StringUtils.stringifyException(t); + t.printStackTrace(); + LOG.error(message); + context.getEventHandler().handle(new AMContainerEventLaunchFailed(event.getContainerId(), message)); + } + } + + private void stop(NMCommunicatorEvent event) { + try{ + context.getEventHandler().handle( + new AMContainerEvent(event.getContainerId(), AMContainerEventType.C_NM_STOP_SENT)); + } catch (Throwable t) { + // ignore the cleanup failure + String message = "cleanup failed for container " + event.getContainerId() + " : " + + StringUtils.stringifyException(t); + context.getEventHandler().handle( + new AMContainerEventStopFailed(event.getContainerId(), message)); + LOG.warn(message); + } + } + + @Override + public void run() { + NMCommunicatorEvent event; + while (!Thread.currentThread().isInterrupted()) { + while (taskExecutor.getActiveCount() >= poolSize){ + try { + LOG.info("Number of Running Tasks reach the uppper bound, sleep 1 seconds!:" + + taskExecutor.getActiveCount()); + Thread.sleep(1000 + sleepTime.nextInt(10) * 1000); + } catch (InterruptedException e) { + LOG.warn("Thread Sleep has been interrupted!", e); + } + } + + try { + event = eventQueue.take(); + } catch (InterruptedException e) { // mostly via T_KILL? JOB_KILL? + LOG.error("Returning, interrupted : ", e); + return; + } + + LOG.info("Processing the event " + event.toString()); + if (event.getType() == NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST) { + launch(event); + } else if (event.getType() == NMCommunicatorEventType.CONTAINER_STOP_REQUEST) { + stop(event); + } else { + LOG.warn("Ignoring unexpected event " + event.toString()); + } + } + } + } //end SubTaskRunner + + //create a SubTask + private synchronized Callable<Object> createSubTask(final Configuration defaultConf, final String host, + final int port, final String containerId, final String tokenIdentifier, final int attemptNumber, + final String[] localDirs, final TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol) { + return new Callable<Object>() { + @Override + public Object call() { + // Pull in configuration specified for the session. + try { + TezChild tezChild; + tezChild = TezChild.newTezChild(defaultConf, host, port, containerId, tokenIdentifier, + attemptNumber, localDirs); + tezChild.setUmbilical(tezTaskUmbilicalProtocol); + tezChild.run(); + } catch (TezException e) { + //need to report the TezException and stop this task + LOG.error("Failed to add User Specified TezConfiguration!", e); + } catch (IOException e) { + //need to report the IOException and stop this task + LOG.error("IOE in launching task!", e); + } catch (InterruptedException e) { + //need to report the IOException and stop this task + LOG.error("Interruption happened during launching task!", e); + } + return null; + } + }; + } + + @Override + public void serviceStop() throws Exception { + if (taskExecutor != null) { + taskExecutor.shutdownNow(); + } + + if (listeningExecutorService != null) { + listeningExecutorService.shutdownNow(); + } + + serviceStopped.set(true); + super.serviceStop(); + } + + @Override + public void handle(NMCommunicatorEvent event) { + try { + eventQueue.put(event); + } catch (InterruptedException e) { + throw new TezUncheckedException(e); // FIXME? YarnRuntimeException is "for runtime exceptions only" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/e49ed339/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java index 131def0..5debca7 100644 --- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -90,6 +90,7 @@ public class TezChild { private final int amHeartbeatInterval; private final long sendCounterInterval; private final int maxEventsToGet; + private final boolean isLocal; private final ListeningExecutorService executor; private final ObjectRegistryImpl objectRegistry; @@ -104,7 +105,8 @@ public class TezChild { public TezChild(Configuration conf, String host, int port, String containerIdentifier, String tokenIdentifier, int appAttemptNumber, String[] localDirs, - ObjectRegistryImpl objectRegistry) throws IOException, InterruptedException { + ObjectRegistryImpl objectRegistry) + throws IOException, InterruptedException { this.defaultConf = conf; this.containerIdString = containerIdentifier; this.appAttemptNumber = appAttemptNumber; @@ -141,6 +143,8 @@ public class TezChild { } } + this.isLocal = defaultConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, + TezConfiguration.TEZ_LOCAL_MODE_DEFAULT); UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(tokenIdentifier); Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials); SecurityUtil.setTokenService(jobToken, address); @@ -149,16 +153,18 @@ public class TezChild { serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ShuffleUtils.convertJobTokenToBytes(jobToken)); - umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() { - @Override - public TezTaskUmbilicalProtocol run() throws Exception { - return (TezTaskUmbilicalProtocol) RPC.getProxy(TezTaskUmbilicalProtocol.class, - TezTaskUmbilicalProtocol.versionID, address, defaultConf); - } - }); + if (!isLocal) { + umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() { + @Override + public TezTaskUmbilicalProtocol run() throws Exception { + return RPC.getProxy(TezTaskUmbilicalProtocol.class, + TezTaskUmbilicalProtocol.versionID, address, defaultConf); + } + }); + } } - void run() throws IOException, InterruptedException, TezException { + public void run() throws IOException, InterruptedException, TezException { ContainerContext containerContext = new ContainerContext(containerIdString); ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext, @@ -205,7 +211,7 @@ public class TezChild { TezTaskRunner taskRunner = new TezTaskRunner(new TezConfiguration(defaultConf), childUGI, localDirs, containerTask.getTaskSpec(), umbilical, appAttemptNumber, serviceConsumerMetadata, startedInputsMap, taskReporter, executor, objectRegistry); - boolean shouldDie = false; + boolean shouldDie; try { shouldDie = !taskRunner.run(); if (shouldDie) { @@ -233,12 +239,12 @@ public class TezChild { * the new task specification. Must be a valid task * @param childUGI * the old UGI instance being used - * @return + * @return childUGI */ UserGroupInformation handleNewTaskCredentials(ContainerTask containerTask, UserGroupInformation childUGI) { // Re-use the UGI only if the Credentials have not changed. - Preconditions.checkState(containerTask.shouldDie() != true); + Preconditions.checkState(!containerTask.shouldDie()); Preconditions.checkState(containerTask.getTaskSpec() != null); if (containerTask.haveCredentialsChanged()) { LOG.info("Refreshing UGI since Credentials have changed"); @@ -294,7 +300,7 @@ public class TezChild { * the new task specification. Must be a valid task */ private void cleanupOnTaskChanged(ContainerTask containerTask) { - Preconditions.checkState(containerTask.shouldDie() != true); + Preconditions.checkState(!containerTask.shouldDie()); Preconditions.checkState(containerTask.getTaskSpec() != null); TezVertexID newVertexID = containerTask.getTaskSpec().getTaskAttemptID().getTaskID() .getVertexID(); @@ -317,30 +323,29 @@ public class TezChild { } RPC.stopProxy(umbilical); DefaultMetricsSystem.shutdown(); - LogManager.shutdown(); + if (!isLocal) { + LogManager.shutdown(); + } } - public static void main(String[] args) throws IOException, InterruptedException, TezException { - Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); - LOG.info("TezChild starting"); + public void setUmbilical(TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol){ + if(tezTaskUmbilicalProtocol != null){ + this.umbilical = tezTaskUmbilicalProtocol; + } + } + + public static TezChild newTezChild(Configuration conf, String host, int port, String containerIdentifier, + String tokenIdentifier, int attemptNumber, String[] localDirs) + throws IOException, InterruptedException, TezException { - final Configuration defaultConf = new Configuration(); // Pull in configuration specified for the session. // TODO TEZ-1233. This needs to be moved over the wire rather than localizing the file // for each and every task, and reading it back from disk. Also needs to be per vertex. - TezUtils.addUserSpecifiedTezConfiguration(defaultConf); - UserGroupInformation.setConfiguration(defaultConf); - Limits.setConfiguration(defaultConf); + TezUtils.addUserSpecifiedTezConfiguration(conf); + UserGroupInformation.setConfiguration(conf); + Limits.setConfiguration(conf); - assert args.length == 5; - String host = args[0]; - int port = Integer.parseInt(args[1]); - final String containerIdentifier = args[2]; - final String tokenIdentifier = args[3]; - final int attemptNumber = Integer.parseInt(args[4]); final String pid = System.getenv().get("JVM_PID"); - final String[] localDirs = StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS - .name())); LOG.info("PID, containerIdentifier: " + pid + ", " + containerIdentifier); if (LOG.isDebugEnabled()) { LOG.debug("Info from cmd line: AM-host: " + host + " AM-port: " + port @@ -355,9 +360,27 @@ public class TezChild { // singleton of ObjectRegistry for this JVM ObjectRegistryImpl objectRegistry = new ObjectRegistryImpl(); - TezChild tezChild = new TezChild(defaultConf, host, port, containerIdentifier, tokenIdentifier, + return new TezChild(conf, host, port, containerIdentifier, tokenIdentifier, attemptNumber, localDirs, objectRegistry); + } + + public static void main(String[] args) throws IOException, InterruptedException, TezException { + + final Configuration defaultConf = new Configuration(); + + Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); + LOG.info("TezChild starting"); + assert args.length == 5; + String host = args[0]; + int port = Integer.parseInt(args[1]); + final String containerIdentifier = args[2]; + final String tokenIdentifier = args[3]; + final int attemptNumber = Integer.parseInt(args[4]); + final String[] localDirs = StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS + .name())); + TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier, + tokenIdentifier, attemptNumber, localDirs); tezChild.run(); }
