Repository: tez
Updated Branches:
  refs/heads/master 25960cc27 -> a9eb0d721


TEZ-870. Change LocalContainerLauncher to handle multiple threads,
improve error reporting and inform other components about container
completion. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a9eb0d72
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a9eb0d72
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a9eb0d72

Branch: refs/heads/master
Commit: a9eb0d721dd6b611c330c11f65fd591a43e1c7ff
Parents: 25960cc
Author: Siddharth Seth <[email protected]>
Authored: Fri Aug 1 12:03:27 2014 -0700
Committer: Siddharth Seth <[email protected]>
Committed: Fri Aug 1 12:03:27 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/tez/client/LocalClient.java |   6 +-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  55 +--
 .../app/launcher/LocalContainerLauncher.java    | 332 ++++++++++---------
 .../org/apache/tez/runtime/task/TezChild.java   |  75 ++++-
 4 files changed, 278 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a9eb0d72/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java 
b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index a2a2ab1..5a379c3 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -224,8 +224,10 @@ public class LocalClient extends FrameworkClient {
           fs.setWorkingDirectory(userDir);
 
           // Prepare Environment
-          Path logDir = new Path(userDir, "logs");
-          Path localDir = new Path(userDir, "local");
+          Path logDir = new Path(userDir, "localmode-log-dir");
+          Path localDir = new Path(userDir, "localmode-local-dir");
+          fs.mkdirs(logDir);
+          fs.mkdirs(localDir);
 
           EnvironmentUpdateUtils.put(Environment.LOG_DIRS.name(), 
logDir.toUri().getPath());
           EnvironmentUpdateUtils.put(Environment.LOCAL_DIRS.name(), 
localDir.toUri().getPath());

http://git-wip-us.apache.org/repos/asf/tez/blob/a9eb0d72/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 2e5ca02..cf34e0e 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -18,8 +18,10 @@
 package org.apache.tez.dag.app;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
+import java.net.UnknownHostException;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -117,28 +119,39 @@ public class TaskAttemptListenerImpTezDag extends 
AbstractService implements
 
   protected void startRpcServer() {
     Configuration conf = getConfig();
-    try {
-      server = new RPC.Builder(conf)
-          .setProtocol(TezTaskUmbilicalProtocol.class)
-          .setBindAddress("0.0.0.0")
-          .setPort(0)
-          .setInstance(this)
-          .setNumHandlers(
-              conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT,
-                  TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT))
-          .setSecretManager(jobTokenSecretManager).build();
-
-      // Enable service authorization?
-      if (conf.getBoolean(
-          CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
-          false)) {
-        refreshServiceAcls(conf, new TezAMPolicyProvider());
-      }
+    if (!conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, 
TezConfiguration.TEZ_LOCAL_MODE_DEFAULT)) {
+      try {
+        server = new RPC.Builder(conf)
+            .setProtocol(TezTaskUmbilicalProtocol.class)
+            .setBindAddress("0.0.0.0")
+            .setPort(0)
+            .setInstance(this)
+            .setNumHandlers(
+                conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT,
+                    
TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT))
+            .setSecretManager(jobTokenSecretManager).build();
+
+        // Enable service authorization?
+        if (conf.getBoolean(
+            CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
+            false)) {
+          refreshServiceAcls(conf, new TezAMPolicyProvider());
+        }
 
-      server.start();
-      this.address = NetUtils.getConnectAddress(server);
-    } catch (IOException e) {
-      throw new TezUncheckedException(e);
+        server.start();
+        this.address = NetUtils.getConnectAddress(server);
+      } catch (IOException e) {
+        throw new TezUncheckedException(e);
+      }
+    } else {
+      try {
+        this.address = new InetSocketAddress(InetAddress.getLocalHost(), 0);
+      } catch (UnknownHostException e) {
+        throw new TezUncheckedException(e);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not starting TaskAttemptListener RPC in LocalMode");
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a9eb0d72/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 9ff9a07..5a33a88 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -20,20 +20,24 @@ package org.apache.tez.dag.app.launcher;
 
 
 import java.io.IOException;
-import java.util.Random;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -41,7 +45,6 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.util.Clock;
 
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -49,13 +52,13 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
-import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
 import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
 import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
 import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
 import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed;
 import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
-import org.apache.tez.dag.app.rm.container.AMContainerEventStopFailed;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
@@ -72,19 +75,24 @@ public class LocalContainerLauncher extends AbstractService 
implements
 
   private static final Log LOG = 
LogFactory.getLog(LocalContainerLauncher.class);
   private final AppContext context;
-  private TaskAttemptListener taskAttemptListener;
+  private final TaskAttemptListener taskAttemptListener;
+  private final AtomicBoolean serviceStopped = new AtomicBoolean(false);
+
+  private final ConcurrentHashMap<ContainerId, 
ListenableFuture<TezChild.ContainerExecutionResult>>
+      runningContainers =
+      new ConcurrentHashMap<ContainerId, 
ListenableFuture<TezChild.ContainerExecutionResult>>();
+
+  private final ExecutorService callbackExecutor = 
Executors.newFixedThreadPool(1,
+      new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("CallbackExecutor").build());
+
   private BlockingQueue<NMCommunicatorEvent> eventQueue =
-    new LinkedBlockingQueue<NMCommunicatorEvent>();
-  private static AtomicBoolean serviceStopped;
+      new LinkedBlockingQueue<NMCommunicatorEvent>();
+  private Thread eventHandlingThread;
+
+
+  private ListeningExecutorService taskExecutorService;
 
-  private Clock clock;
-  private LinkedBlockingQueue<Runnable> taskQueue =
-    new LinkedBlockingQueue<Runnable>();
 
-  private ThreadPoolExecutor taskExecutor;
-  private ListeningExecutorService listeningExecutorService;
-  private int poolSize;
-  private final Random sleepTime = new Random();
 
   public LocalContainerLauncher(AppContext context,
                                 TaskAttemptListener taskAttemptListener) {
@@ -94,183 +102,199 @@ public class LocalContainerLauncher extends 
AbstractService implements
   }
 
   @Override
+  public synchronized void serviceInit(Configuration conf) {
+    int numExecutors = 
conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
+        TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT);
+    Preconditions.checkState(numExecutors >=1, "Must have at least 1 
executor");
+    ExecutorService rawExecutor = Executors.newFixedThreadPool(numExecutors,
+        new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("LocalTaskExecutionThread 
[%d]")
+            .build());
+    this.taskExecutorService = MoreExecutors.listeningDecorator(rawExecutor);
+  }
+
+  @Override
   public void serviceStart() throws Exception {
-    Thread eventHandlingThread = new Thread(new TezSubTaskRunner(),
-      "LocalContainerLauncher-SubTaskRunner");
+    eventHandlingThread =
+        new Thread(new TezSubTaskRunner(), 
"LocalContainerLauncher-SubTaskRunner");
     eventHandlingThread.start();
-    super.serviceStart();
   }
 
   @Override
-  public synchronized void serviceInit(Configuration config) {
-    serviceStopped = new AtomicBoolean(false);
-    this.clock = context.getClock();
-    this.poolSize = 
config.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
-      TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT);
-    int maxPoolSize = poolSize;
-
-    this.taskExecutor = new ThreadPoolExecutor(poolSize, maxPoolSize, 60*1000,
-      TimeUnit.SECONDS, taskQueue);
-    this.listeningExecutorService = 
MoreExecutors.listeningDecorator(taskExecutor);
+  public void serviceStop() throws Exception {
+    if (!serviceStopped.compareAndSet(false, true)) {
+      LOG.info("Service Already stopped. Ignoring additional stop");
+    }
+    if (eventHandlingThread != null) {
+      eventHandlingThread.interrupt();
+      eventHandlingThread.join(2000l);
+    }
+    if (taskExecutorService != null) {
+      taskExecutorService.shutdownNow();
+    }
+    callbackExecutor.shutdownNow();
+  }
+
+  // Thread to monitor the queue of incoming NMCommunicator events
+  private class TezSubTaskRunner implements Runnable {
+    @Override
+    public void run() {
+      while (!Thread.currentThread().isInterrupted() && !serviceStopped.get()) 
{
+        NMCommunicatorEvent event;
+        try {
+          event = eventQueue.take();
+          switch (event.getType()) {
+            case CONTAINER_LAUNCH_REQUEST:
+              launch((NMCommunicatorLaunchRequestEvent) event);
+              break;
+            case CONTAINER_STOP_REQUEST:
+              stop((NMCommunicatorStopRequestEvent)event);
+              break;
+          }
+        } catch (InterruptedException e) {
+          if (!serviceStopped.get()) {
+            LOG.error("TezSubTaskRunner interrupted ", e);
+          }
+          return;
+        }
+      }
+    }
   }
 
   @SuppressWarnings("unchecked")
   void sendContainerLaunchFailedMsg(ContainerId containerId, String message) {
-    LOG.error(message);
     context.getEventHandler().handle(new 
AMContainerEventLaunchFailed(containerId, message));
   }
 
-  //should mimic container using threads
-  //need to start all MapProcessor and RedProcessor here
-  private class TezSubTaskRunner implements Runnable {
+  //launch tasks
+  private void launch(NMCommunicatorLaunchRequestEvent event) {
 
-    ListenableFuture<Object> runningTask;
-    TezSubTaskRunner() {}
+    String tokenIdentifier = context.getApplicationID().toString();
 
-    //launch tasks
-    private void launch(NMCommunicatorEvent event) {
-      NMCommunicatorLaunchRequestEvent launchEv = 
(NMCommunicatorLaunchRequestEvent)event;
+    String[] localDirs =
+        
StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name()));
 
-      String containerIdStr = event.getContainerId().toString();
-      String host = 
taskAttemptListener.getAddress().getAddress().getHostAddress();
-      int port = taskAttemptListener.getAddress().getPort();
-      String tokenIdentifier = context.getApplicationID().toString();
+    try {
+      ListenableFuture<TezChild.ContainerExecutionResult> runningTaskFuture =
+          taskExecutorService.submit(createSubTask(context.getAMConf(),
+              event.getContainerId(), tokenIdentifier,
+              context.getApplicationAttemptId().getAttemptId(),
+              localDirs, (TezTaskUmbilicalProtocol) taskAttemptListener));
+      runningContainers.put(event.getContainerId(), runningTaskFuture);
+      Futures
+          .addCallback(runningTaskFuture, new RunningTaskCallback(context, 
event.getContainerId()),
+              callbackExecutor);
+    } catch (RejectedExecutionException e) {
+      String message = "Failed to queue container launch for container Id: " + 
event.getContainerId();
+      LOG.error(message, e);
+      sendContainerLaunchFailedMsg(event.getContainerId(), message);
+    }
+  }
 
-      String[] localDirs =
-        
StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name()));
+  private void stop(NMCommunicatorStopRequestEvent event) {
+    ListenableFuture<TezChild.ContainerExecutionResult> future =
+        runningContainers.get(event.getContainerId());
+    if (future == null) {
+      LOG.info("Ignoring stop request for containerId: " + 
event.getContainerId());
+    } else {
+      LOG.info("Interrupting running/queued container with id: " + 
event.getContainerId());
+      future.cancel(true);
+      // This will work only if the running task respects Interrupts - which 
at the moment is
+      // not the case for parts of the Runtime.
+    }
+    // Send this event to maintain regular control flow. This isn't of much 
use though.
+    context.getEventHandler().handle(
+        new AMContainerEvent(event.getContainerId(), 
AMContainerEventType.C_NM_STOP_SENT));
+  }
 
-      try {
-        runningTask = 
listeningExecutorService.submit(createSubTask(context.getAMConf(),
-          host, port, containerIdStr, tokenIdentifier, 
context.getApplicationAttemptId().getAttemptId(),
-          localDirs, (TezTaskUmbilicalProtocol) taskAttemptListener));
-        Futures.addCallback(runningTask,
-          new FutureCallback<Object>() {
-            @Override
-            public void onSuccess(Object result) {
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-              LOG.error("Container launching failed", t);
-            }
-          }
-          , taskExecutor);
-      } catch (Throwable throwable) {
-        LOG.info("Failed to start runSubTask thread!", throwable);
-        sendContainerLaunchFailedMsg(event.getContainerId(), "Container 
Launching Failed!");
-      }
+  private class RunningTaskCallback
+      implements FutureCallback<TezChild.ContainerExecutionResult> {
 
-      try{
-        context.getEventHandler().handle(
-          new AMContainerEventLaunched(launchEv.getContainerId()));
-        ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
-          event.getContainerId(), clock.getTime(), 
context.getApplicationAttemptId());
-        context.getHistoryHandler().handle(new 
DAGHistoryEvent(context.getCurrentDAGID(),lEvt));
-      } catch (Throwable t) {
-        String message = "Container launch failed for " +  
event.getContainerId() + " : " +
-          StringUtils.stringifyException(t);
-        t.printStackTrace();
-        LOG.error(message);
-        context.getEventHandler().handle(new 
AMContainerEventLaunchFailed(event.getContainerId(), message));
-      }
+    private final AppContext appContext;
+    private final ContainerId containerId;
+
+    RunningTaskCallback(AppContext appContext, ContainerId containerId) {
+      this.appContext = appContext;
+      this.containerId = containerId;
     }
 
-    private void stop(NMCommunicatorEvent event) {
-      try{
-        context.getEventHandler().handle(
-          new AMContainerEvent(event.getContainerId(), 
AMContainerEventType.C_NM_STOP_SENT));
-      } catch (Throwable t) {
-        // ignore the cleanup failure
-        String message = "cleanup failed for container " +  
event.getContainerId() + " : " +
-          StringUtils.stringifyException(t);
-        context.getEventHandler().handle(
-          new AMContainerEventStopFailed(event.getContainerId(), message));
-        LOG.warn(message);
+    @Override
+    public void onSuccess(TezChild.ContainerExecutionResult result) {
+      runningContainers.remove(containerId);
+      if (result.getExitStatus() == 
TezChild.ContainerExecutionResult.ExitStatus.SUCCESS ||
+          result.getExitStatus() ==
+              TezChild.ContainerExecutionResult.ExitStatus.ASKED_TO_DIE) {
+        LOG.info("Container: " + containerId + " completed successfully");
+        appContext.getEventHandler().handle(
+            new AMContainerEventCompleted(containerId, 
result.getExitStatus().getExitCode(),
+                null));
+      } else {
+        LOG.info("Container: " + containerId + " completed but with errors");
+        appContext.getEventHandler().handle(
+            new AMContainerEventCompleted(containerId, 
result.getExitStatus().getExitCode(),
+                result.getErrorMessage() == null ?
+                    (result.getThrowable() == null ? null : 
result.getThrowable().getMessage()) :
+                    result.getErrorMessage()));
       }
     }
 
     @Override
-    public void run() {
-      NMCommunicatorEvent event;
-      while (!Thread.currentThread().isInterrupted()) {
-        while (taskExecutor.getActiveCount() >= poolSize){
-          try {
-            LOG.info("Number of Running Tasks reach the uppper bound, sleep 1 
seconds!:" +
-              taskExecutor.getActiveCount());
-            Thread.sleep(1000 + sleepTime.nextInt(10) * 1000);
-          } catch (InterruptedException e) {
-            LOG.warn("Thread Sleep has been interrupted!", e);
-          }
-        }
-
-        try {
-          event = eventQueue.take();
-        } catch (InterruptedException e) {  // mostly via T_KILL? JOB_KILL?
-          LOG.error("Returning, interrupted : ", e);
-          return;
-        }
-
-        LOG.info("Processing the event " + event.toString());
-        if (event.getType() == 
NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST) {
-          launch(event);
-        } else if (event.getType() == 
NMCommunicatorEventType.CONTAINER_STOP_REQUEST) {
-          stop(event);
-        } else {
-          LOG.warn("Ignoring unexpected event " + event.toString());
-        }
+    public void onFailure(Throwable t) {
+      runningContainers.remove(containerId);
+      // Ignore CancellationException since that is triggered by the 
LocalContainerLauncher itself
+      if (!(t instanceof CancellationException)) {
+        LOG.info("Container: " + containerId + ": Execution Failed: ", t);
+        // Inform of failure with exit code 1.
+        appContext.getEventHandler()
+            .handle(new AMContainerEventCompleted(containerId,
+                
TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE.getExitCode(),
+                t.getMessage()));
+      } else {
+        LOG.info("Ignoring CancellationException - triggered by 
LocalContainerLauncher");
+        appContext.getEventHandler()
+            .handle(new AMContainerEventCompleted(containerId,
+                
TezChild.ContainerExecutionResult.ExitStatus.SUCCESS.getExitCode(),
+                "CancellationException"));
       }
     }
-  } //end SubTaskRunner
+  }
+
 
   //create a SubTask
-  private synchronized Callable<Object> createSubTask(final Configuration 
defaultConf, final String host,
-      final int port, final String containerId, final String tokenIdentifier, 
final int attemptNumber,
-      final String[] localDirs, final TezTaskUmbilicalProtocol 
tezTaskUmbilicalProtocol) {
-    return new Callable<Object>() {
+  private synchronized Callable<TezChild.ContainerExecutionResult> 
createSubTask(
+      final Configuration defaultConf,
+      final ContainerId containerId,
+      final String tokenIdentifier,
+      final int attemptNumber,
+      final String[] localDirs,
+      final TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol) {
+
+    return new Callable<TezChild.ContainerExecutionResult>() {
       @Override
-      public Object call() {
+      public TezChild.ContainerExecutionResult call() throws 
InterruptedException, TezException,
+          IOException {
+        // Inform about the launch request now that the container has been 
allocated a thread to execute in.
+        context.getEventHandler().handle(new 
AMContainerEventLaunched(containerId));
+        ContainerLaunchedEvent lEvt =
+            new ContainerLaunchedEvent(containerId, 
context.getClock().getTime(),
+                context.getApplicationAttemptId());
+        context.getHistoryHandler().handle(new 
DAGHistoryEvent(context.getCurrentDAGID(), lEvt));
+
         // Pull in configuration specified for the session.
-        try {
-          TezChild tezChild;
-          tezChild = TezChild.newTezChild(defaultConf, host, port, 
containerId, tokenIdentifier,
-            attemptNumber, localDirs);
-          tezChild.setUmbilical(tezTaskUmbilicalProtocol);
-          tezChild.run();
-        } catch (TezException e) {
-          //need to report the TezException and stop this task
-          LOG.error("Failed to add User Specified TezConfiguration!", e);
-        } catch (IOException e) {
-          //need to report the IOException and stop this task
-          LOG.error("IOE in launching task!", e);
-        } catch (InterruptedException e) {
-          //need to report the IOException and stop this task
-          LOG.error("Interruption happened during launching task!", e);
-        }
-        return null;
+        TezChild tezChild =
+            TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), 
tokenIdentifier,
+                attemptNumber, localDirs);
+        tezChild.setUmbilical(tezTaskUmbilicalProtocol);
+        return tezChild.run();
       }
     };
   }
 
   @Override
-  public void serviceStop() throws Exception {
-    if (taskExecutor != null) {
-      taskExecutor.shutdownNow();
-    }
-
-    if (listeningExecutorService != null) {
-      listeningExecutorService.shutdownNow();
-    }
-
-    serviceStopped.set(true);
-    super.serviceStop();
-  }
-
-  @Override
   public void handle(NMCommunicatorEvent event) {
     try {
       eventQueue.put(event);
     } catch (InterruptedException e) {
-      throw new TezUncheckedException(e);  // FIXME? YarnRuntimeException is 
"for runtime exceptions only"
+      throw new TezUncheckedException(e);
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/a9eb0d72/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java 
b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 5debca7..8203451 100644
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.runtime.task;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -81,7 +82,6 @@ public class TezChild {
   private final Configuration defaultConf;
   private final String containerIdString;
   private final int appAttemptNumber;
-  private final InetSocketAddress address;
   private final String[] localDirs;
 
   private final AtomicLong heartbeatCounter = new AtomicLong(0);
@@ -126,8 +126,6 @@ public class TezChild {
     maxEventsToGet = 
defaultConf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
         TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT);
 
-    address = NetUtils.createSocketAddrForHost(host, port);
-
     ExecutorService executor = Executors.newFixedThreadPool(1, new 
ThreadFactoryBuilder()
         .setDaemon(true).setNameFormat("TezChild").build());
     this.executor = MoreExecutors.listeningDecorator(executor);
@@ -147,13 +145,14 @@ public class TezChild {
         TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
     UserGroupInformation taskOwner = 
UserGroupInformation.createRemoteUser(tokenIdentifier);
     Token<JobTokenIdentifier> jobToken = 
TokenCache.getSessionToken(credentials);
-    SecurityUtil.setTokenService(jobToken, address);
-    taskOwner.addToken(jobToken);
 
     serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
         ShuffleUtils.convertJobTokenToBytes(jobToken));
 
     if (!isLocal) {
+      final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, 
port);
+      SecurityUtil.setTokenService(jobToken, address);
+      taskOwner.addToken(jobToken);
       umbilical = taskOwner.doAs(new 
PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
         @Override
         public TezTaskUmbilicalProtocol run() throws Exception {
@@ -164,7 +163,7 @@ public class TezChild {
     }
   }
   
-  public void run() throws IOException, InterruptedException, TezException {
+  public ContainerExecutionResult run() throws IOException, 
InterruptedException, TezException {
 
     ContainerContext containerContext = new 
ContainerContext(containerIdString);
     ContainerReporter containerReporter = new ContainerReporter(umbilical, 
containerContext,
@@ -186,17 +185,20 @@ public class TezChild {
       } catch (ExecutionException e) {
         Throwable cause = e.getCause();
         handleError(cause);
-        return;
+        return new 
ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
+            cause, "Execution Exception while fetching new work: " + 
e.getMessage());
       } catch (InterruptedException e) {
-        LOG.info("Interrupted while waiting for task to complete:"
+        LOG.info("Interrupted while waiting for new work:"
             + containerTask.getTaskSpec().getTaskAttemptID());
         handleError(e);
-        return;
+        return new 
ContainerExecutionResult(ContainerExecutionResult.ExitStatus.INTERRUPTED, e,
+            "Interrupted while waiting for new work");
       }
       if (containerTask.shouldDie()) {
         LOG.info("ContainerTask returned shouldDie=true, Exiting");
         shutdown();
-        return;
+        return new 
ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
+            "Asked to die by the AM");
       } else {
         String loggerAddend = 
containerTask.getTaskSpec().getTaskAttemptID().toString();
         taskCount++;
@@ -217,19 +219,24 @@ public class TezChild {
           if (shouldDie) {
             LOG.info("Got a shouldDie notification via hearbeats. Shutting 
down");
             shutdown();
-            return;
+            return new 
ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
+                "Asked to die by the AM");
           }
         } catch (IOException e) {
           handleError(e);
-          return;
+          return new 
ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
+              e, "TaskExecutionFailure: " + e.getMessage());
         } catch (TezException e) {
           handleError(e);
-          return;
+          return new 
ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
+              e, "TaskExecutionFailure: " + e.getMessage());
         } finally {
           FileSystem.closeAllForUGI(childUGI);
         }
       }
     }
+    return new 
ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
+        null);
   }
 
   /**
@@ -334,6 +341,48 @@ public class TezChild {
     }
   }
 
+  public static class ContainerExecutionResult {
+    public static enum ExitStatus {
+      SUCCESS(0),
+      EXECUTION_FAILURE(1),
+      INTERRUPTED(2),
+      ASKED_TO_DIE(3);
+
+      private final int exitCode;
+
+      ExitStatus(int code) {
+        this.exitCode = code;
+      }
+
+      public int getExitCode() {
+        return this.exitCode;
+      }
+    }
+
+    private final ExitStatus exitStatus;
+    private final Throwable throwable;
+    private final String errorMessage;
+
+    ContainerExecutionResult(ExitStatus exitStatus, @Nullable Throwable 
throwable,
+                             @Nullable String errorMessage) {
+      this.exitStatus = exitStatus;
+      this.throwable = throwable;
+      this.errorMessage = errorMessage;
+    }
+
+    public ExitStatus getExitStatus() {
+      return this.exitStatus;
+    }
+
+    public Throwable getThrowable() {
+      return this.throwable;
+    }
+
+    public String getErrorMessage() {
+      return this.errorMessage;
+    }
+  }
+
   public static TezChild newTezChild(Configuration conf, String host, int 
port, String containerIdentifier,
       String tokenIdentifier, int attemptNumber, String[] localDirs)
       throws IOException, InterruptedException, TezException {

Reply via email to