Repository: hive
Updated Branches:
  refs/heads/llap e6b1556e3 -> 8b442b266


HIVE-10682. LLAP: Make use of the task runner which allows killing tasks. 
(Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8b442b26
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8b442b26
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8b442b26

Branch: refs/heads/llap
Commit: 8b442b266f1d5e95b2746109ac188b6ec6545cef
Parents: e6b1556
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon May 11 23:38:49 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon May 11 23:38:49 2015 -0700

----------------------------------------------------------------------
 .../llap/daemon/impl/TaskExecutorService.java   | 16 ++--
 .../llap/daemon/impl/TaskRunnerCallable.java    | 84 +++++++++-----------
 .../daemon/impl/TestTaskExecutorService.java    |  6 +-
 3 files changed, 49 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8b442b26/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index ed8df95..11ba793 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.log4j.Logger;
-import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult;
+import org.apache.tez.runtime.task.TaskRunner2Result;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.FutureCallback;
@@ -187,8 +187,8 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
 
     boolean scheduled = false;
     try {
-      ListenableFuture<ContainerExecutionResult> future = 
executorService.submit(task);
-      FutureCallback<ContainerExecutionResult> wrappedCallback =
+      ListenableFuture<TaskRunner2Result> future = 
executorService.submit(task);
+      FutureCallback<TaskRunner2Result> wrappedCallback =
           new InternalCompletionListener(task.getCallback());
       Futures.addCallback(future, wrappedCallback);
 
@@ -252,8 +252,8 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
           // try to submit the task from wait queue to executor service. If it 
gets rejected the
           // task from wait queue will hold on to its position for next try.
           try {
-            ListenableFuture<ContainerExecutionResult> future = 
executorService.submit(task);
-            FutureCallback<ContainerExecutionResult> wrappedCallback =
+            ListenableFuture<TaskRunner2Result> future = 
executorService.submit(task);
+            FutureCallback<TaskRunner2Result> wrappedCallback =
                 new InternalCompletionListener(task.getCallback());
             Futures.addCallback(future, wrappedCallback);
             numSlotsAvailable.decrementAndGet();
@@ -285,14 +285,14 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
   }
 
   private synchronized void addTaskToPreemptionList(TaskRunnerCallable task,
-      ListenableFuture<ContainerExecutionResult> future) {
+      ListenableFuture<TaskRunner2Result> future) {
     idToTaskMap.put(task.getRequestId(), task);
     preemptionMap.put(task, future);
     preemptionQueue.add(task);
   }
 
   private final class InternalCompletionListener implements
-      FutureCallback<ContainerExecutionResult> {
+      FutureCallback<TaskRunner2Result> {
     private TaskRunnerCallable.TaskRunnerCallback wrappedCallback;
 
     public InternalCompletionListener(TaskRunnerCallable.TaskRunnerCallback 
wrappedCallback) {
@@ -300,7 +300,7 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
     }
 
     @Override
-    public void onSuccess(ContainerExecutionResult result) {
+    public void onSuccess(TaskRunner2Result result) {
       wrappedCallback.onSuccess(result);
       updatePreemptionListAndNotify(true);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/8b442b26/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 97a8b78..7e7c133 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hive.llap.daemon.impl;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
@@ -27,6 +26,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.conf.Configuration;
@@ -45,12 +45,10 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.log4j.Logger;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.TezException;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.impl.InputSpec;
@@ -58,8 +56,7 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
 import org.apache.tez.runtime.internals.api.TaskReporterInterface;
 import org.apache.tez.runtime.library.input.UnorderedKVInput;
-import org.apache.tez.runtime.task.TezChild;
-import org.apache.tez.runtime.task.TezTaskRunner;
+import org.apache.tez.runtime.task.TaskRunner2Result;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.HashMultimap;
@@ -68,12 +65,15 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.tez.runtime.task.TezTaskRunner2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  *
  */
-public class TaskRunnerCallable extends 
CallableWithNdc<TezChild.ContainerExecutionResult> {
-  private static final Logger LOG = Logger.getLogger(TaskRunnerCallable.class);
+public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TaskRunnerCallable.class);
   private final LlapDaemonProtocolProtos.SubmitWorkRequestProto request;
   private final Configuration conf;
   private final String[] localDirs;
@@ -88,7 +88,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TezChild.ContainerExecut
   private final AMReporter amReporter;
   private final ConcurrentMap<String, 
LlapDaemonProtocolProtos.SourceStateProto> sourceCompletionMap;
   private final TaskSpec taskSpec;
-  private volatile TezTaskRunner taskRunner;
+  private volatile TezTaskRunner2 taskRunner;
   private volatile TaskReporterInterface taskReporter;
   private volatile ListeningExecutorService executor;
   private LlapTaskUmbilicalProtocol umbilical;
@@ -126,7 +126,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TezChild.ContainerExecut
   }
 
   @Override
-  protected TezChild.ContainerExecutionResult callInternal() throws Exception {
+  protected TaskRunner2Result callInternal() throws Exception {
     this.startTime = System.currentTimeMillis();
     this.threadName = Thread.currentThread().getName();
     if (LOG.isDebugEnabled()) {
@@ -177,43 +177,29 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TezChild.ContainerExecut
         new AtomicLong(0),
         request.getContainerIdString());
 
-    taskRunner = new TezTaskRunner(conf, taskUgi, localDirs,
+    taskRunner = new TezTaskRunner2(conf, taskUgi, localDirs,
         taskSpec,
         request.getAppAttemptNumber(),
         serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, 
executor, objectRegistry,
         pid,
         executionContext, memoryAvailable);
 
-    boolean shouldDie;
     try {
-      shouldDie = !taskRunner.run();
-      if (shouldDie) {
-        LOG.info("Got a shouldDie notification via heartbeats. Shutting down");
-        return new TezChild.ContainerExecutionResult(
-            TezChild.ContainerExecutionResult.ExitStatus.ASKED_TO_DIE, null,
-            "Asked to die by the AM");
+      TaskRunner2Result result = taskRunner.run();
+      if (result.isContainerShutdownRequested()) {
+        LOG.warn("Unexpected container shutdown requested while running task. 
Ignoring");
       }
-    } catch (IOException e) {
-      return new TezChild.ContainerExecutionResult(
-          TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
-          e, "TaskExecutionFailure: " + e.getMessage());
-    } catch (TezException e) {
-      return new TezChild.ContainerExecutionResult(
-          TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
-          e, "TaskExecutionFailure: " + e.getMessage());
+      return result;
+
     } finally {
       // TODO Fix UGI and FS Handling. Closing UGI here causes some errors 
right now.
       //        FileSystem.closeAllForUGI(taskUgi);
+      LOG.info("ExecutionTime for Container: " + 
request.getContainerIdString() + "=" +
+          sw.stop().elapsed(TimeUnit.MILLISECONDS));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() 
+ ": " + canFinish());
+      }
     }
-    LOG.info("ExecutionTime for Container: " + request.getContainerIdString() 
+ "=" +
-        sw.stop().elapsedMillis());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() + 
": " + canFinish());
-    }
-
-    return new TezChild.ContainerExecutionResult(
-        TezChild.ContainerExecutionResult.ExitStatus.SUCCESS, null,
-        null);
   }
 
   /**
@@ -311,7 +297,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TezChild.ContainerExecut
     return new TaskRunnerCallback(request, this);
   }
 
-  final class TaskRunnerCallback implements 
FutureCallback<TezChild.ContainerExecutionResult> {
+  final class TaskRunnerCallback implements FutureCallback<TaskRunner2Result> {
 
     private final LlapDaemonProtocolProtos.SubmitWorkRequestProto request;
     private final TaskRunnerCallable taskRunnerCallable;
@@ -330,25 +316,29 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TezChild.ContainerExecut
 
     // TODO Slightly more useful error handling
     @Override
-    public void onSuccess(TezChild.ContainerExecutionResult result) {
-      switch (result.getExitStatus()) {
+    public void onSuccess(TaskRunner2Result result) {
+      switch(result.getEndReason()) {
+        // Only the KILLED case requires a message to be sent out to the AM.
         case SUCCESS:
-          LOG.info("Successfully finished: " + requestId);
+          LOG.info("Successfully finished {}", requestId);
           metrics.incrExecutorTotalSuccess();
           break;
-        case EXECUTION_FAILURE:
-          LOG.info("Failed to run: " + requestId);
-          metrics.incrExecutorTotalExecutionFailed();
+        case CONTAINER_STOP_REQUESTED:
+          LOG.warn("Unexpected CONTAINER_STOP_REQUEST for {}", requestId);
           break;
-        case INTERRUPTED:
-          LOG.info("Interrupted while running: " + requestId);
-          metrics.incrExecutorTotalInterrupted();
+        case KILL_REQUESTED:
+          // TODO Send a kill out to the AM.
           break;
-        case ASKED_TO_DIE:
-          LOG.info("Asked to die while running: " + requestId);
-          metrics.incrExecutorTotalAskedToDie();
+        case COMMUNICATION_FAILURE:
+          LOG.info("Failed to run {} due to communication failure", requestId);
+          metrics.incrExecutorTotalExecutionFailed();
+          break;
+        case TASK_ERROR:
+          LOG.info("Failed to run {} due to task error", requestId);
+          metrics.incrExecutorTotalExecutionFailed();
           break;
       }
+
       taskRunnerCallable.shutdown();
       HistoryLogger
           .logFragmentEnd(request.getApplicationIdString(), 
request.getContainerIdString(),

http://git-wip-us.apache.org/repos/asf/hive/blob/8b442b26/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index 44a4633..95750c4 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -35,6 +35,8 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.task.EndReason;
+import org.apache.tez.runtime.task.TaskRunner2Result;
 import org.apache.tez.runtime.task.TezChild;
 import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult;
 import 
org.apache.tez.runtime.task.TezChild.ContainerExecutionResult.ExitStatus;
@@ -58,10 +60,10 @@ public class TestTaskExecutorService {
     }
 
     @Override
-    protected TezChild.ContainerExecutionResult callInternal() throws 
Exception {
+    protected TaskRunner2Result callInternal() throws Exception {
       System.out.println(requestId + " is executing..");
       Thread.sleep(workTime);
-      return new ContainerExecutionResult(ExitStatus.SUCCESS, null, null);
+      return new TaskRunner2Result(EndReason.SUCCESS, null, false);
     }
 
     @Override

Reply via email to