Repository: tez Updated Branches: refs/heads/master 25960cc27 -> a9eb0d721
TEZ-870. Change LocalContainerLauncher to handle multiple threads, improve error reporting and inform other components about container completion. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a9eb0d72 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a9eb0d72 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a9eb0d72 Branch: refs/heads/master Commit: a9eb0d721dd6b611c330c11f65fd591a43e1c7ff Parents: 25960cc Author: Siddharth Seth <[email protected]> Authored: Fri Aug 1 12:03:27 2014 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri Aug 1 12:03:27 2014 -0700 ---------------------------------------------------------------------- .../java/org/apache/tez/client/LocalClient.java | 6 +- .../dag/app/TaskAttemptListenerImpTezDag.java | 55 +-- .../app/launcher/LocalContainerLauncher.java | 332 ++++++++++--------- .../org/apache/tez/runtime/task/TezChild.java | 75 ++++- 4 files changed, 278 insertions(+), 190 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a9eb0d72/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java index a2a2ab1..5a379c3 100644 --- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java +++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java @@ -224,8 +224,10 @@ public class LocalClient extends FrameworkClient { fs.setWorkingDirectory(userDir); // Prepare Environment - Path logDir = new Path(userDir, "logs"); - Path localDir = new Path(userDir, "local"); + Path logDir = new Path(userDir, "localmode-log-dir"); + Path localDir = new Path(userDir, "localmode-local-dir"); + fs.mkdirs(logDir); + fs.mkdirs(localDir); EnvironmentUpdateUtils.put(Environment.LOG_DIRS.name(), logDir.toUri().getPath()); EnvironmentUpdateUtils.put(Environment.LOCAL_DIRS.name(), localDir.toUri().getPath()); http://git-wip-us.apache.org/repos/asf/tez/blob/a9eb0d72/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index 2e5ca02..cf34e0e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -18,8 +18,10 @@ package org.apache.tez.dag.app; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URISyntaxException; +import java.net.UnknownHostException; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -117,28 +119,39 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements protected void startRpcServer() { Configuration conf = getConfig(); - try { - server = new RPC.Builder(conf) - .setProtocol(TezTaskUmbilicalProtocol.class) - .setBindAddress("0.0.0.0") - .setPort(0) - .setInstance(this) - .setNumHandlers( - conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT, - TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT)) - .setSecretManager(jobTokenSecretManager).build(); - - // Enable service authorization? - if (conf.getBoolean( - CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, - false)) { - refreshServiceAcls(conf, new TezAMPolicyProvider()); - } + if (!conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT)) { + try { + server = new RPC.Builder(conf) + .setProtocol(TezTaskUmbilicalProtocol.class) + .setBindAddress("0.0.0.0") + .setPort(0) + .setInstance(this) + .setNumHandlers( + conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT, + TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT)) + .setSecretManager(jobTokenSecretManager).build(); + + // Enable service authorization? + if (conf.getBoolean( + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, + false)) { + refreshServiceAcls(conf, new TezAMPolicyProvider()); + } - server.start(); - this.address = NetUtils.getConnectAddress(server); - } catch (IOException e) { - throw new TezUncheckedException(e); + server.start(); + this.address = NetUtils.getConnectAddress(server); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + } else { + try { + this.address = new InetSocketAddress(InetAddress.getLocalHost(), 0); + } catch (UnknownHostException e) { + throw new TezUncheckedException(e); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Not starting TaskAttemptListener RPC in LocalMode"); + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/a9eb0d72/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index 9ff9a07..5a33a88 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -20,20 +20,24 @@ package org.apache.tez.dag.app.launcher; import java.io.IOException; -import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -41,7 +45,6 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.dag.api.TezConfiguration; @@ -49,13 +52,13 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.TaskAttemptListener; -import org.apache.tez.dag.app.rm.NMCommunicatorEventType; import org.apache.tez.dag.app.rm.NMCommunicatorEvent; import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; +import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent; import org.apache.tez.dag.app.rm.container.AMContainerEvent; +import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted; import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed; import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched; -import org.apache.tez.dag.app.rm.container.AMContainerEventStopFailed; import org.apache.tez.dag.app.rm.container.AMContainerEventType; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.events.ContainerLaunchedEvent; @@ -72,19 +75,24 @@ public class LocalContainerLauncher extends AbstractService implements private static final Log LOG = LogFactory.getLog(LocalContainerLauncher.class); private final AppContext context; - private TaskAttemptListener taskAttemptListener; + private final TaskAttemptListener taskAttemptListener; + private final AtomicBoolean serviceStopped = new AtomicBoolean(false); + + private final ConcurrentHashMap<ContainerId, ListenableFuture<TezChild.ContainerExecutionResult>> + runningContainers = + new ConcurrentHashMap<ContainerId, ListenableFuture<TezChild.ContainerExecutionResult>>(); + + private final ExecutorService callbackExecutor = Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CallbackExecutor").build()); + private BlockingQueue<NMCommunicatorEvent> eventQueue = - new LinkedBlockingQueue<NMCommunicatorEvent>(); - private static AtomicBoolean serviceStopped; + new LinkedBlockingQueue<NMCommunicatorEvent>(); + private Thread eventHandlingThread; + + + private ListeningExecutorService taskExecutorService; - private Clock clock; - private LinkedBlockingQueue<Runnable> taskQueue = - new LinkedBlockingQueue<Runnable>(); - private ThreadPoolExecutor taskExecutor; - private ListeningExecutorService listeningExecutorService; - private int poolSize; - private final Random sleepTime = new Random(); public LocalContainerLauncher(AppContext context, TaskAttemptListener taskAttemptListener) { @@ -94,183 +102,199 @@ public class LocalContainerLauncher extends AbstractService implements } @Override + public synchronized void serviceInit(Configuration conf) { + int numExecutors = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, + TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT); + Preconditions.checkState(numExecutors >=1, "Must have at least 1 executor"); + ExecutorService rawExecutor = Executors.newFixedThreadPool(numExecutors, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LocalTaskExecutionThread [%d]") + .build()); + this.taskExecutorService = MoreExecutors.listeningDecorator(rawExecutor); + } + + @Override public void serviceStart() throws Exception { - Thread eventHandlingThread = new Thread(new TezSubTaskRunner(), - "LocalContainerLauncher-SubTaskRunner"); + eventHandlingThread = + new Thread(new TezSubTaskRunner(), "LocalContainerLauncher-SubTaskRunner"); eventHandlingThread.start(); - super.serviceStart(); } @Override - public synchronized void serviceInit(Configuration config) { - serviceStopped = new AtomicBoolean(false); - this.clock = context.getClock(); - this.poolSize = config.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, - TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT); - int maxPoolSize = poolSize; - - this.taskExecutor = new ThreadPoolExecutor(poolSize, maxPoolSize, 60*1000, - TimeUnit.SECONDS, taskQueue); - this.listeningExecutorService = MoreExecutors.listeningDecorator(taskExecutor); + public void serviceStop() throws Exception { + if (!serviceStopped.compareAndSet(false, true)) { + LOG.info("Service Already stopped. Ignoring additional stop"); + } + if (eventHandlingThread != null) { + eventHandlingThread.interrupt(); + eventHandlingThread.join(2000l); + } + if (taskExecutorService != null) { + taskExecutorService.shutdownNow(); + } + callbackExecutor.shutdownNow(); + } + + // Thread to monitor the queue of incoming NMCommunicator events + private class TezSubTaskRunner implements Runnable { + @Override + public void run() { + while (!Thread.currentThread().isInterrupted() && !serviceStopped.get()) { + NMCommunicatorEvent event; + try { + event = eventQueue.take(); + switch (event.getType()) { + case CONTAINER_LAUNCH_REQUEST: + launch((NMCommunicatorLaunchRequestEvent) event); + break; + case CONTAINER_STOP_REQUEST: + stop((NMCommunicatorStopRequestEvent)event); + break; + } + } catch (InterruptedException e) { + if (!serviceStopped.get()) { + LOG.error("TezSubTaskRunner interrupted ", e); + } + return; + } + } + } } @SuppressWarnings("unchecked") void sendContainerLaunchFailedMsg(ContainerId containerId, String message) { - LOG.error(message); context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, message)); } - //should mimic container using threads - //need to start all MapProcessor and RedProcessor here - private class TezSubTaskRunner implements Runnable { + //launch tasks + private void launch(NMCommunicatorLaunchRequestEvent event) { - ListenableFuture<Object> runningTask; - TezSubTaskRunner() {} + String tokenIdentifier = context.getApplicationID().toString(); - //launch tasks - private void launch(NMCommunicatorEvent event) { - NMCommunicatorLaunchRequestEvent launchEv = (NMCommunicatorLaunchRequestEvent)event; + String[] localDirs = + StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())); - String containerIdStr = event.getContainerId().toString(); - String host = taskAttemptListener.getAddress().getAddress().getHostAddress(); - int port = taskAttemptListener.getAddress().getPort(); - String tokenIdentifier = context.getApplicationID().toString(); + try { + ListenableFuture<TezChild.ContainerExecutionResult> runningTaskFuture = + taskExecutorService.submit(createSubTask(context.getAMConf(), + event.getContainerId(), tokenIdentifier, + context.getApplicationAttemptId().getAttemptId(), + localDirs, (TezTaskUmbilicalProtocol) taskAttemptListener)); + runningContainers.put(event.getContainerId(), runningTaskFuture); + Futures + .addCallback(runningTaskFuture, new RunningTaskCallback(context, event.getContainerId()), + callbackExecutor); + } catch (RejectedExecutionException e) { + String message = "Failed to queue container launch for container Id: " + event.getContainerId(); + LOG.error(message, e); + sendContainerLaunchFailedMsg(event.getContainerId(), message); + } + } - String[] localDirs = - StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())); + private void stop(NMCommunicatorStopRequestEvent event) { + ListenableFuture<TezChild.ContainerExecutionResult> future = + runningContainers.get(event.getContainerId()); + if (future == null) { + LOG.info("Ignoring stop request for containerId: " + event.getContainerId()); + } else { + LOG.info("Interrupting running/queued container with id: " + event.getContainerId()); + future.cancel(true); + // This will work only if the running task respects Interrupts - which at the moment is + // not the case for parts of the Runtime. + } + // Send this event to maintain regular control flow. This isn't of much use though. + context.getEventHandler().handle( + new AMContainerEvent(event.getContainerId(), AMContainerEventType.C_NM_STOP_SENT)); + } - try { - runningTask = listeningExecutorService.submit(createSubTask(context.getAMConf(), - host, port, containerIdStr, tokenIdentifier, context.getApplicationAttemptId().getAttemptId(), - localDirs, (TezTaskUmbilicalProtocol) taskAttemptListener)); - Futures.addCallback(runningTask, - new FutureCallback<Object>() { - @Override - public void onSuccess(Object result) { - } - - @Override - public void onFailure(Throwable t) { - LOG.error("Container launching failed", t); - } - } - , taskExecutor); - } catch (Throwable throwable) { - LOG.info("Failed to start runSubTask thread!", throwable); - sendContainerLaunchFailedMsg(event.getContainerId(), "Container Launching Failed!"); - } + private class RunningTaskCallback + implements FutureCallback<TezChild.ContainerExecutionResult> { - try{ - context.getEventHandler().handle( - new AMContainerEventLaunched(launchEv.getContainerId())); - ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent( - event.getContainerId(), clock.getTime(), context.getApplicationAttemptId()); - context.getHistoryHandler().handle(new DAGHistoryEvent(context.getCurrentDAGID(),lEvt)); - } catch (Throwable t) { - String message = "Container launch failed for " + event.getContainerId() + " : " + - StringUtils.stringifyException(t); - t.printStackTrace(); - LOG.error(message); - context.getEventHandler().handle(new AMContainerEventLaunchFailed(event.getContainerId(), message)); - } + private final AppContext appContext; + private final ContainerId containerId; + + RunningTaskCallback(AppContext appContext, ContainerId containerId) { + this.appContext = appContext; + this.containerId = containerId; } - private void stop(NMCommunicatorEvent event) { - try{ - context.getEventHandler().handle( - new AMContainerEvent(event.getContainerId(), AMContainerEventType.C_NM_STOP_SENT)); - } catch (Throwable t) { - // ignore the cleanup failure - String message = "cleanup failed for container " + event.getContainerId() + " : " + - StringUtils.stringifyException(t); - context.getEventHandler().handle( - new AMContainerEventStopFailed(event.getContainerId(), message)); - LOG.warn(message); + @Override + public void onSuccess(TezChild.ContainerExecutionResult result) { + runningContainers.remove(containerId); + if (result.getExitStatus() == TezChild.ContainerExecutionResult.ExitStatus.SUCCESS || + result.getExitStatus() == + TezChild.ContainerExecutionResult.ExitStatus.ASKED_TO_DIE) { + LOG.info("Container: " + containerId + " completed successfully"); + appContext.getEventHandler().handle( + new AMContainerEventCompleted(containerId, result.getExitStatus().getExitCode(), + null)); + } else { + LOG.info("Container: " + containerId + " completed but with errors"); + appContext.getEventHandler().handle( + new AMContainerEventCompleted(containerId, result.getExitStatus().getExitCode(), + result.getErrorMessage() == null ? + (result.getThrowable() == null ? null : result.getThrowable().getMessage()) : + result.getErrorMessage())); } } @Override - public void run() { - NMCommunicatorEvent event; - while (!Thread.currentThread().isInterrupted()) { - while (taskExecutor.getActiveCount() >= poolSize){ - try { - LOG.info("Number of Running Tasks reach the uppper bound, sleep 1 seconds!:" + - taskExecutor.getActiveCount()); - Thread.sleep(1000 + sleepTime.nextInt(10) * 1000); - } catch (InterruptedException e) { - LOG.warn("Thread Sleep has been interrupted!", e); - } - } - - try { - event = eventQueue.take(); - } catch (InterruptedException e) { // mostly via T_KILL? JOB_KILL? - LOG.error("Returning, interrupted : ", e); - return; - } - - LOG.info("Processing the event " + event.toString()); - if (event.getType() == NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST) { - launch(event); - } else if (event.getType() == NMCommunicatorEventType.CONTAINER_STOP_REQUEST) { - stop(event); - } else { - LOG.warn("Ignoring unexpected event " + event.toString()); - } + public void onFailure(Throwable t) { + runningContainers.remove(containerId); + // Ignore CancellationException since that is triggered by the LocalContainerLauncher itself + if (!(t instanceof CancellationException)) { + LOG.info("Container: " + containerId + ": Execution Failed: ", t); + // Inform of failure with exit code 1. + appContext.getEventHandler() + .handle(new AMContainerEventCompleted(containerId, + TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE.getExitCode(), + t.getMessage())); + } else { + LOG.info("Ignoring CancellationException - triggered by LocalContainerLauncher"); + appContext.getEventHandler() + .handle(new AMContainerEventCompleted(containerId, + TezChild.ContainerExecutionResult.ExitStatus.SUCCESS.getExitCode(), + "CancellationException")); } } - } //end SubTaskRunner + } + //create a SubTask - private synchronized Callable<Object> createSubTask(final Configuration defaultConf, final String host, - final int port, final String containerId, final String tokenIdentifier, final int attemptNumber, - final String[] localDirs, final TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol) { - return new Callable<Object>() { + private synchronized Callable<TezChild.ContainerExecutionResult> createSubTask( + final Configuration defaultConf, + final ContainerId containerId, + final String tokenIdentifier, + final int attemptNumber, + final String[] localDirs, + final TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol) { + + return new Callable<TezChild.ContainerExecutionResult>() { @Override - public Object call() { + public TezChild.ContainerExecutionResult call() throws InterruptedException, TezException, + IOException { + // Inform about the launch request now that the container has been allocated a thread to execute in. + context.getEventHandler().handle(new AMContainerEventLaunched(containerId)); + ContainerLaunchedEvent lEvt = + new ContainerLaunchedEvent(containerId, context.getClock().getTime(), + context.getApplicationAttemptId()); + context.getHistoryHandler().handle(new DAGHistoryEvent(context.getCurrentDAGID(), lEvt)); + // Pull in configuration specified for the session. - try { - TezChild tezChild; - tezChild = TezChild.newTezChild(defaultConf, host, port, containerId, tokenIdentifier, - attemptNumber, localDirs); - tezChild.setUmbilical(tezTaskUmbilicalProtocol); - tezChild.run(); - } catch (TezException e) { - //need to report the TezException and stop this task - LOG.error("Failed to add User Specified TezConfiguration!", e); - } catch (IOException e) { - //need to report the IOException and stop this task - LOG.error("IOE in launching task!", e); - } catch (InterruptedException e) { - //need to report the IOException and stop this task - LOG.error("Interruption happened during launching task!", e); - } - return null; + TezChild tezChild = + TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier, + attemptNumber, localDirs); + tezChild.setUmbilical(tezTaskUmbilicalProtocol); + return tezChild.run(); } }; } @Override - public void serviceStop() throws Exception { - if (taskExecutor != null) { - taskExecutor.shutdownNow(); - } - - if (listeningExecutorService != null) { - listeningExecutorService.shutdownNow(); - } - - serviceStopped.set(true); - super.serviceStop(); - } - - @Override public void handle(NMCommunicatorEvent event) { try { eventQueue.put(event); } catch (InterruptedException e) { - throw new TezUncheckedException(e); // FIXME? YarnRuntimeException is "for runtime exceptions only" + throw new TezUncheckedException(e); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/a9eb0d72/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java index 5debca7..8203451 100644 --- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -18,6 +18,7 @@ package org.apache.tez.runtime.task; +import javax.annotation.Nullable; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; @@ -81,7 +82,6 @@ public class TezChild { private final Configuration defaultConf; private final String containerIdString; private final int appAttemptNumber; - private final InetSocketAddress address; private final String[] localDirs; private final AtomicLong heartbeatCounter = new AtomicLong(0); @@ -126,8 +126,6 @@ public class TezChild { maxEventsToGet = defaultConf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT, TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT); - address = NetUtils.createSocketAddrForHost(host, port); - ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true).setNameFormat("TezChild").build()); this.executor = MoreExecutors.listeningDecorator(executor); @@ -147,13 +145,14 @@ public class TezChild { TezConfiguration.TEZ_LOCAL_MODE_DEFAULT); UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(tokenIdentifier); Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials); - SecurityUtil.setTokenService(jobToken, address); - taskOwner.addToken(jobToken); serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ShuffleUtils.convertJobTokenToBytes(jobToken)); if (!isLocal) { + final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, port); + SecurityUtil.setTokenService(jobToken, address); + taskOwner.addToken(jobToken); umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() { @Override public TezTaskUmbilicalProtocol run() throws Exception { @@ -164,7 +163,7 @@ public class TezChild { } } - public void run() throws IOException, InterruptedException, TezException { + public ContainerExecutionResult run() throws IOException, InterruptedException, TezException { ContainerContext containerContext = new ContainerContext(containerIdString); ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext, @@ -186,17 +185,20 @@ public class TezChild { } catch (ExecutionException e) { Throwable cause = e.getCause(); handleError(cause); - return; + return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, + cause, "Execution Exception while fetching new work: " + e.getMessage()); } catch (InterruptedException e) { - LOG.info("Interrupted while waiting for task to complete:" + LOG.info("Interrupted while waiting for new work:" + containerTask.getTaskSpec().getTaskAttemptID()); handleError(e); - return; + return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.INTERRUPTED, e, + "Interrupted while waiting for new work"); } if (containerTask.shouldDie()) { LOG.info("ContainerTask returned shouldDie=true, Exiting"); shutdown(); - return; + return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null, + "Asked to die by the AM"); } else { String loggerAddend = containerTask.getTaskSpec().getTaskAttemptID().toString(); taskCount++; @@ -217,19 +219,24 @@ public class TezChild { if (shouldDie) { LOG.info("Got a shouldDie notification via hearbeats. Shutting down"); shutdown(); - return; + return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null, + "Asked to die by the AM"); } } catch (IOException e) { handleError(e); - return; + return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, + e, "TaskExecutionFailure: " + e.getMessage()); } catch (TezException e) { handleError(e); - return; + return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, + e, "TaskExecutionFailure: " + e.getMessage()); } finally { FileSystem.closeAllForUGI(childUGI); } } } + return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null, + null); } /** @@ -334,6 +341,48 @@ public class TezChild { } } + public static class ContainerExecutionResult { + public static enum ExitStatus { + SUCCESS(0), + EXECUTION_FAILURE(1), + INTERRUPTED(2), + ASKED_TO_DIE(3); + + private final int exitCode; + + ExitStatus(int code) { + this.exitCode = code; + } + + public int getExitCode() { + return this.exitCode; + } + } + + private final ExitStatus exitStatus; + private final Throwable throwable; + private final String errorMessage; + + ContainerExecutionResult(ExitStatus exitStatus, @Nullable Throwable throwable, + @Nullable String errorMessage) { + this.exitStatus = exitStatus; + this.throwable = throwable; + this.errorMessage = errorMessage; + } + + public ExitStatus getExitStatus() { + return this.exitStatus; + } + + public Throwable getThrowable() { + return this.throwable; + } + + public String getErrorMessage() { + return this.errorMessage; + } + } + public static TezChild newTezChild(Configuration conf, String host, int port, String containerIdentifier, String tokenIdentifier, int attemptNumber, String[] localDirs) throws IOException, InterruptedException, TezException {
