Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java?rev=1665307&r1=1665306&r2=1665307&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
 Mon Mar  9 17:18:07 2015
@@ -16,9 +16,9 @@ package org.apache.hadoop.hive.llap.daem
 
 import java.io.IOException;
 
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 
 public interface ContainerRunner {
 
-  void queueContainer(LlapDaemonProtocolProtos.RunContainerRequestProto 
request) throws IOException;
+  void submitWork(SubmitWorkRequestProto request) throws IOException;
 }

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java?rev=1665307&r1=1665306&r2=1665307&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java
 Mon Mar  9 17:18:07 2015
@@ -29,6 +29,7 @@ public class HistoryLogger {
   private static final String HISTORY_VERTEX_NAME = "VertexName";
   private static final String HISTORY_TASK_ID = "TaskId";
   private static final String HISTORY_ATTEMPT_ID = "TaskAttemptId";
+  private static final String HISTORY_THREAD_NAME = "ThreadName";
   private static final String HISTORY_HOSTNAME = "HostName";
   private static final String HISTORY_SUCCEEDED = "Succeeded";
 
@@ -48,9 +49,9 @@ public class HistoryLogger {
 
   public static void logFragmentEnd(String applicationIdStr, String 
containerIdStr, String hostname,
                                     String dagName, String vertexName, int 
taskId, int attemptId,
-                                    long startTime, boolean failed) {
+                                    String threadName, long startTime, boolean 
failed) {
     HISTORY_LOGGER.info(constructFragmentEndString(applicationIdStr, 
containerIdStr, hostname,
-        dagName, vertexName, taskId, attemptId, startTime, failed));
+        dagName, vertexName, taskId, attemptId, threadName, startTime, 
failed));
   }
 
 
@@ -72,7 +73,7 @@ public class HistoryLogger {
   private static String constructFragmentEndString(String applicationIdStr, 
String containerIdStr,
                                                    String hostname, String 
dagName,
                                                    String vertexName, int 
taskId, int attemptId,
-                                                   long startTime, boolean 
succeeded) {
+                                                   String threadName, long 
startTime, boolean succeeded) {
     HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_END);
     lb.addHostName(hostname);
     lb.addAppid(applicationIdStr);
@@ -81,6 +82,7 @@ public class HistoryLogger {
     lb.addVertexName(vertexName);
     lb.addTaskId(taskId);
     lb.addTaskAttemptId(attemptId);
+    lb.addThreadName(threadName);
     lb.addSuccessStatus(succeeded);
     lb.addTime(HISTORY_START_TIME, startTime);
     lb.addTime(HISTORY_END_TIME);
@@ -122,6 +124,10 @@ public class HistoryLogger {
       return setKeyValue(HISTORY_ATTEMPT_ID, String.valueOf(attemptId));
     }
 
+    HistoryLineBuilder addThreadName(String threadName) {
+      return setKeyValue(HISTORY_THREAD_NAME, threadName);
+    }
+
     HistoryLineBuilder addTime(String timeParam, long millis) {
       return setKeyValue(timeParam, String.valueOf(millis));
     }

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java?rev=1665307&r1=1665306&r2=1665307&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java
 Mon Mar  9 17:18:07 2015
@@ -57,8 +57,9 @@ public class LlapDaemonConfiguration ext
 
 
   // Section for configs used in the AM //
-  public static final String LLAP_DAEMON_AM_SERVICE_HOSTS = LLAP_DAEMON_PREFIX 
+ "service.hosts";
-  public static final String LLAP_DAEMON_AM_COMMUNICATOR_NUM_THREADS = 
LLAP_DAEMON_PREFIX + "communicator.num.threads";
-  public static final int LLAP_DAEMON_AM_COMMUNICATOR_NUM_THREADS_DEFAULT = 5;
+  public static final String LLAP_DAEMON_SERVICE_HOSTS = LLAP_DAEMON_PREFIX + 
"service.hosts";
+
+  public static final String LLAP_DAEMON_COMMUNICATOR_NUM_THREADS = 
LLAP_DAEMON_PREFIX + "communicator.num.threads";
+  public static final int LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT = 5;
 
 }

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java?rev=1665307&r1=1665306&r2=1665307&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 Mon Mar  9 17:18:07 2015
@@ -18,37 +18,50 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.CallableWithNdc;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
+import org.apache.hadoop.hive.llap.tezplugins.Converters;
 import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
 import org.apache.log4j.Logger;
 import org.apache.log4j.NDC;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
-import org.apache.tez.runtime.task.TezChild;
+import org.apache.tez.runtime.task.TaskReporter;
 import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult;
 
 import com.google.common.base.Preconditions;
@@ -59,6 +72,7 @@ import com.google.common.util.concurrent
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.tez.runtime.task.TezTaskRunner;
 
 public class ContainerRunnerImpl extends AbstractService implements 
ContainerRunner {
 
@@ -73,6 +87,7 @@ public class ContainerRunnerImpl extends
   private volatile FileSystem localFs;
   private final long memoryPerExecutor;
   private final LlapDaemonExecutorMetrics metrics;
+  private final ConfParams confParams;
   // TODO Support for removing queued containers, interrupting / killing 
specific containers
 
   public ContainerRunnerImpl(int numExecutors, String[] localDirsBase, int 
localShufflePort,
@@ -83,6 +98,13 @@ public class ContainerRunnerImpl extends
         "Invalid number of executors: " + numExecutors + ". Must be > 0");
     this.localDirsBase = localDirsBase;
     this.localAddress = localAddress;
+    this.confParams = new ConfParams();
+    // Setup to defaults to start with
+    confParams.amMaxEventsPerHeartbeat = 
TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT;
+    confParams.amHeartbeatIntervalMsMax =
+        TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX_DEFAULT;
+    confParams.amCounterHeartbeatInterval =
+        TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT;
 
     ExecutorService raw = Executors.newFixedThreadPool(numExecutors,
         new ThreadFactoryBuilder().setNameFormat(THREAD_NAME_FORMAT).build());
@@ -104,6 +126,16 @@ public class ContainerRunnerImpl extends
   public void serviceInit(Configuration conf) {
     try {
       localFs = FileSystem.getLocal(conf);
+      // TODO Fix visibility of these parameters - which
+      confParams.amCounterHeartbeatInterval = conf.getLong(
+          TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS,
+          TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT);
+      confParams.amHeartbeatIntervalMsMax =
+          conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
+              TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
+      confParams.amMaxEventsPerHeartbeat =
+          conf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
+              TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT);
     } catch (IOException e) {
       throw new RuntimeException("Failed to setup local filesystem instance", 
e);
     }
@@ -129,12 +161,21 @@ public class ContainerRunnerImpl extends
   }
 
   @Override
-  public void queueContainer(RunContainerRequestProto request) throws 
IOException {
+  public void submitWork(SubmitWorkRequestProto request) throws IOException {
     HistoryLogger.logFragmentStart(request.getApplicationIdString(), 
request.getContainerIdString(),
-        localAddress.get().getHostName(), null, null, -1, -1);
+        localAddress.get().getHostName(), 
request.getFragmentSpec().getDagName(),
+        request.getFragmentSpec().getVertexName(), 
request.getFragmentSpec().getFragmentNumber(),
+        request.getFragmentSpec().getAttemptNumber());
     LOG.info("Queuing container for execution: " + request);
     // This is the start of container-annotated logging.
-    NDC.push(request.getContainerIdString());
+    // TODO Reduce the length of this string. Way too verbose at the moment.
+    String ndcContextString =
+        request.getContainerIdString() + "_" +
+            request.getFragmentSpec().getDagName() + "_" +
+            request.getFragmentSpec().getVertexName() +
+            "_" + request.getFragmentSpec().getFragmentNumber() + "_" +
+            request.getFragmentSpec().getAttemptNumber();
+    NDC.push(ndcContextString);
     try {
       Map<String, String> env = new HashMap<String, String>();
       // TODO What else is required in this environment map.
@@ -149,13 +190,14 @@ public class ContainerRunnerImpl extends
             request.getUser());
         localFs.mkdirs(new Path(localDirs[i]));
       }
-      LOG.info("DEBUG: Dirs are: " + Arrays.toString(localDirs));
+      // TODO Avoid this directory creation on each work-unit submission.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Dirs are: " + Arrays.toString(localDirs));
+      }
 
 
       // Setup workingDir. This is otherwise setup as Environment.PWD
       // Used for re-localization, to add the user specified configuration 
(conf_pb_binary_stream)
-      // TODO Set this up to read user configuration if required. Ideally, 
Inputs / Outputs should be self configured.
-      // Setting this up correctly is more from framework components to setup 
security, ping intervals, etc.
       String workingDir = localDirs[0];
 
       Credentials credentials = new Credentials();
@@ -170,12 +212,11 @@ public class ContainerRunnerImpl extends
       LOG.info("DEBUG: Registering request with the ShuffleHandler");
       
ShuffleHandler.get().registerApplication(request.getApplicationIdString(), 
jobToken, request.getUser());
 
-      ContainerRunnerCallable callable = new ContainerRunnerCallable(request, 
new Configuration(getConfig()),
+      TaskRunnerCallable callable = new TaskRunnerCallable(request, new 
Configuration(getConfig()),
           new ExecutionContextImpl(localAddress.get().getHostName()), env, 
localDirs,
-          workingDir, credentials, memoryPerExecutor, 
localAddress.get().getHostName());
-      ListenableFuture<ContainerExecutionResult> future = executorService
-          .submit(callable);
-      Futures.addCallback(future, new ContainerRunnerCallback(request, 
callable));
+          workingDir, credentials, memoryPerExecutor, confParams);
+      ListenableFuture<ContainerExecutionResult> future = 
executorService.submit(callable);
+      Futures.addCallback(future, new TaskRunnerCallback(request, callable));
       metrics.incrExecutorTotalRequestsHandled();
       metrics.incrExecutorNumQueuedRequests();
     } finally {
@@ -183,28 +224,31 @@ public class ContainerRunnerImpl extends
     }
   }
 
-  static class ContainerRunnerCallable extends 
CallableWithNdc<ContainerExecutionResult> {
+  static class TaskRunnerCallable extends 
CallableWithNdc<ContainerExecutionResult> {
 
-    private final RunContainerRequestProto request;
+    private final SubmitWorkRequestProto request;
     private final Configuration conf;
     private final String workingDir;
     private final String[] localDirs;
     private final Map<String, String> envMap;
-    // TODO Is a null pid valid - will this work with multiple different 
ResourceMonitors ?
     private final String pid = null;
     private final ObjectRegistryImpl objectRegistry;
     private final ExecutionContext executionContext;
     private final Credentials credentials;
     private final long memoryAvailable;
-    private volatile TezChild tezChild;
-    private final String localHostname;
+    private final ListeningExecutorService executor;
+    private final ConfParams confParams;
+    private volatile TezTaskRunner taskRunner;
+    private volatile TaskReporter taskReporter;
+    private TezTaskUmbilicalProtocol umbilical;
     private volatile long startTime;
+    private volatile String threadName;
 
 
-    ContainerRunnerCallable(RunContainerRequestProto request, Configuration 
conf,
+    TaskRunnerCallable(SubmitWorkRequestProto request, Configuration conf,
                             ExecutionContext executionContext, Map<String, 
String> envMap,
                             String[] localDirs, String workingDir, Credentials 
credentials,
-                            long memoryAvailable, String localHostName) {
+                            long memoryAvailable, ConfParams confParams) {
       this.request = request;
       this.conf = conf;
       this.executionContext = executionContext;
@@ -214,40 +258,105 @@ public class ContainerRunnerImpl extends
       this.objectRegistry = new ObjectRegistryImpl();
       this.credentials = credentials;
       this.memoryAvailable = memoryAvailable;
-      this.localHostname = localHostName;
-
+      this.confParams = confParams;
+      // TODO This executor seems unnecessary. Here and TezChild
+      ExecutorService executorReal = Executors.newFixedThreadPool(1,
+          new ThreadFactoryBuilder()
+              .setDaemon(true)
+              .setNameFormat(
+                  "TezTaskRunner_" + 
request.getFragmentSpec().getTaskAttemptIdString())
+              .build());
+      executor = MoreExecutors.listeningDecorator(executorReal);
     }
 
     @Override
     protected ContainerExecutionResult callInternal() throws Exception {
       this.startTime = System.currentTimeMillis();
+      this.threadName = Thread.currentThread().getName();
+      // TODO Consolidate this code with TezChild.
       Stopwatch sw = new Stopwatch().start();
-      tezChild =
-          new TezChild(conf, request.getAmHost(), request.getAmPort(),
-              request.getContainerIdString(),
-              request.getTokenIdentifier(), request.getAppAttemptNumber(), 
workingDir, localDirs,
-              envMap, objectRegistry, pid,
-              executionContext, credentials, memoryAvailable, 
request.getUser(), null);
-      ContainerExecutionResult result = tezChild.run();
+       UserGroupInformation taskUgi = 
UserGroupInformation.createRemoteUser(request.getUser());
+       taskUgi.addCredentials(credentials);
+
+       Token<JobTokenIdentifier> jobToken = 
TokenCache.getSessionToken(credentials);
+       Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, 
ByteBuffer>();
+       serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+           TezCommonUtils.convertJobTokenToBytes(jobToken));
+       Multimap<String, String> startedInputsMap = HashMultimap.create();
+
+       UserGroupInformation taskOwner =
+           UserGroupInformation.createRemoteUser(request.getTokenIdentifier());
+       final InetSocketAddress address =
+           NetUtils.createSocketAddrForHost(request.getAmHost(), 
request.getAmPort());
+       SecurityUtil.setTokenService(jobToken, address);
+       taskOwner.addToken(jobToken);
+       umbilical = taskOwner.doAs(new 
PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
+         @Override
+         public TezTaskUmbilicalProtocol run() throws Exception {
+           return RPC.getProxy(TezTaskUmbilicalProtocol.class,
+               TezTaskUmbilicalProtocol.versionID, address, conf);
+         }
+       });
+
+       taskReporter = new TaskReporter(
+           umbilical,
+           confParams.amHeartbeatIntervalMsMax,
+           confParams.amCounterHeartbeatInterval,
+           confParams.amMaxEventsPerHeartbeat,
+           new AtomicLong(0),
+           request.getContainerIdString());
+
+       taskRunner = new TezTaskRunner(conf, taskUgi, localDirs,
+           Converters.getTaskSpecfromProto(request.getFragmentSpec()), 
umbilical,
+           request.getAppAttemptNumber(),
+           serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, 
executor, objectRegistry,
+           pid,
+           executionContext, memoryAvailable);
+
+       boolean shouldDie;
+       try {
+         shouldDie = !taskRunner.run();
+         if (shouldDie) {
+           LOG.info("Got a shouldDie notification via heartbeats. Shutting 
down");
+           return new 
ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
+               "Asked to die by the AM");
+         }
+       } catch (IOException e) {
+         return new 
ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
+             e, "TaskExecutionFailure: " + e.getMessage());
+       } catch (TezException e) {
+         return new 
ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
+             e, "TaskExecutionFailure: " + e.getMessage());
+       } finally {
+        // TODO Fix UGI and FS Handling. Closing UGI here causes some errors 
right now.
+//        FileSystem.closeAllForUGI(taskUgi);
+       }
       LOG.info("ExecutionTime for Container: " + 
request.getContainerIdString() + "=" +
           sw.stop().elapsedMillis());
-      return result;
+      return new 
ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
+          null);
     }
 
-    public TezChild getTezChild() {
-      return this.tezChild;
+    public void shutdown() {
+      executor.shutdownNow();
+      if (taskReporter != null) {
+        taskReporter.shutdown();
+      }
+      if (umbilical != null) {
+        RPC.stopProxy(umbilical);
+      }
     }
   }
 
-  final class ContainerRunnerCallback implements 
FutureCallback<ContainerExecutionResult> {
+  final class TaskRunnerCallback implements 
FutureCallback<ContainerExecutionResult> {
 
-    private final RunContainerRequestProto request;
-    private final ContainerRunnerCallable containerRunnerCallable;
+    private final SubmitWorkRequestProto request;
+    private final TaskRunnerCallable taskRunnerCallable;
 
-    ContainerRunnerCallback(RunContainerRequestProto request,
-                            ContainerRunnerCallable containerRunnerCallable) {
+    TaskRunnerCallback(SubmitWorkRequestProto request,
+                       TaskRunnerCallable taskRunnerCallable) {
       this.request = request;
-      this.containerRunnerCallable = containerRunnerCallable;
+      this.taskRunnerCallable = taskRunnerCallable;
     }
 
     // TODO Slightly more useful error handling
@@ -255,51 +364,62 @@ public class ContainerRunnerImpl extends
     public void onSuccess(ContainerExecutionResult result) {
       switch (result.getExitStatus()) {
         case SUCCESS:
-          LOG.info("Successfully finished: " + 
request.getApplicationIdString() + ", containerId=" +
-              request.getContainerIdString());
+          LOG.info("Successfully finished: " + 
getTaskIdentifierString(request));
           metrics.incrExecutorTotalSuccess();
           break;
         case EXECUTION_FAILURE:
-          LOG.info("Failed to run: " + request.getApplicationIdString() + ", 
containerId=" +
-              request.getContainerIdString(), result.getThrowable());
+          LOG.info("Failed to run: " + getTaskIdentifierString(request));
           metrics.incrExecutorTotalExecutionFailed();
           break;
         case INTERRUPTED:
-          LOG.info(
-              "Interrupted while running: " + request.getApplicationIdString() 
+ ", containerId=" +
-                  request.getContainerIdString(), result.getThrowable());
+          LOG.info("Interrupted while running: " + 
getTaskIdentifierString(request));
           metrics.incrExecutorTotalInterrupted();
           break;
         case ASKED_TO_DIE:
-          LOG.info(
-              "Asked to die while running: " + 
request.getApplicationIdString() + ", containerId=" +
-                  request.getContainerIdString());
+          LOG.info("Asked to die while running: " + 
getTaskIdentifierString(request));
           metrics.incrExecutorTotalAskedToDie();
           break;
       }
+      taskRunnerCallable.shutdown();
       HistoryLogger
-          .logFragmentEnd(request.getApplicationIdString(),
-              request.getContainerIdString(),
-              localAddress.get().getHostName(), null, null, -1, -1,
-              containerRunnerCallable.startTime, true);
+          .logFragmentEnd(request.getApplicationIdString(), 
request.getContainerIdString(),
+              localAddress.get().getHostName(), 
request.getFragmentSpec().getDagName(),
+              request.getFragmentSpec().getVertexName(),
+              request.getFragmentSpec().getFragmentNumber(),
+              request.getFragmentSpec().getAttemptNumber(), 
taskRunnerCallable.threadName,
+              taskRunnerCallable.startTime, true);
       metrics.decrExecutorNumQueuedRequests();
     }
 
     @Override
     public void onFailure(Throwable t) {
-      LOG.error(
-          "TezChild execution failed for : " + 
request.getApplicationIdString() + ", containerId=" +
-              request.getContainerIdString());
-      TezChild tezChild = containerRunnerCallable.getTezChild();
-      if (tezChild != null) {
-        tezChild.shutdown();
-      }
+      LOG.error("TezTaskRunner execution failed for : " + 
getTaskIdentifierString(request), t);
+      taskRunnerCallable.shutdown();
       HistoryLogger
-          .logFragmentEnd(request.getApplicationIdString(),
-              request.getContainerIdString(),
-              localAddress.get().getHostName(), null, null, -1, -1,
-              containerRunnerCallable.startTime, false);
+          .logFragmentEnd(request.getApplicationIdString(), 
request.getContainerIdString(),
+              localAddress.get().getHostName(), 
request.getFragmentSpec().getDagName(),
+              request.getFragmentSpec().getVertexName(),
+              request.getFragmentSpec().getFragmentNumber(),
+              request.getFragmentSpec().getAttemptNumber(), 
taskRunnerCallable.threadName,
+              taskRunnerCallable.startTime, false);
       metrics.decrExecutorNumQueuedRequests();
     }
+
+    private String getTaskIdentifierString(SubmitWorkRequestProto request) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("AppId=").append(request.getApplicationIdString())
+          .append(", containerId=").append(request.getContainerIdString())
+          .append(", Dag=").append(request.getFragmentSpec().getDagName())
+          .append(", 
Vertex=").append(request.getFragmentSpec().getVertexName())
+          .append(", 
FragmentNum=").append(request.getFragmentSpec().getFragmentNumber())
+          .append(", 
Attempt=").append(request.getFragmentSpec().getAttemptNumber());
+      return sb.toString();
+    }
+  }
+
+  private static class ConfParams {
+    int amHeartbeatIntervalMsMax;
+    long amCounterHeartbeatInterval;
+    int amMaxEventsPerHeartbeat;
   }
 }

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java?rev=1665307&r1=1665306&r2=1665307&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
 Mon Mar  9 17:18:07 2015
@@ -24,7 +24,7 @@ import javax.management.ObjectName;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
@@ -190,8 +190,9 @@ public class LlapDaemon extends Abstract
   }
 
   @Override
-  public void queueContainer(RunContainerRequestProto request) throws 
IOException {
-    containerRunner.queueContainer(request);
+  public void submitWork(LlapDaemonProtocolProtos.SubmitWorkRequestProto 
request) throws
+      IOException {
+    containerRunner.submitWork(request);
   }
 
   // LlapDaemonMXBean methods. Will be exposed via JMX
@@ -224,4 +225,6 @@ public class LlapDaemon extends Abstract
   public long getMaxJvmMemory() {
     return maxJvmMemory;
   }
+
+
 }

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java?rev=1665307&r1=1665306&r2=1665307&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
 Mon Mar  9 17:18:07 2015
@@ -20,12 +20,11 @@ import java.net.InetSocketAddress;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerRequestProto;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerResponseProto;
 
 // TODO Change all this to be based on a regular interface instead of relying 
on the Proto service - Exception signatures cannot be controlled without this 
for the moment.
 
@@ -43,17 +42,16 @@ public class LlapDaemonProtocolClientImp
   }
 
   @Override
-  public RunContainerResponseProto runContainer(RpcController controller,
-                                                RunContainerRequestProto 
request) throws
+  public LlapDaemonProtocolProtos.SubmitWorkResponseProto 
submitWork(RpcController controller,
+                                                                     
LlapDaemonProtocolProtos.SubmitWorkRequestProto request) throws
       ServiceException {
     try {
-      return getProxy().runContainer(null, request);
+      return getProxy().submitWork(null, request);
     } catch (IOException e) {
       throw new ServiceException(e);
     }
   }
 
-
   public LlapDaemonProtocolBlockingPB getProxy() throws IOException {
     if (proxy == null) {
       proxy = createProxy();
@@ -65,8 +63,7 @@ public class LlapDaemonProtocolClientImp
     LlapDaemonProtocolBlockingPB p;
     // TODO Fix security
     RPC.setProtocolEngine(conf, LlapDaemonProtocolBlockingPB.class, 
ProtobufRpcEngine.class);
-    p = (LlapDaemonProtocolBlockingPB) RPC
-        .getProxy(LlapDaemonProtocolBlockingPB.class, 0, serverAddr, conf);
+    p =  RPC.getProxy(LlapDaemonProtocolBlockingPB.class, 0, serverAddr, conf);
     return p;
   }
 }

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java?rev=1665307&r1=1665306&r2=1665307&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
 Mon Mar  9 17:18:07 2015
@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
@@ -34,8 +35,6 @@ import org.apache.hadoop.service.Abstrac
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration;
 import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerRequestProto;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerResponseProto;
 
 public class LlapDaemonProtocolServerImpl extends AbstractService
     implements LlapDaemonProtocolBlockingPB {
@@ -58,19 +57,18 @@ public class LlapDaemonProtocolServerImp
   }
 
   @Override
-  public RunContainerResponseProto runContainer(RpcController controller,
-                                                RunContainerRequestProto 
request) throws
+  public SubmitWorkResponseProto submitWork(RpcController controller,
+                                            
LlapDaemonProtocolProtos.SubmitWorkRequestProto request) throws
       ServiceException {
-    LOG.info("Received request: " + request);
+    LOG.info("DEBUG: Recevied request: " + request);
     try {
-      containerRunner.queueContainer(request);
+      containerRunner.submitWork(request);
     } catch (IOException e) {
       throw new ServiceException(e);
     }
-    return RunContainerResponseProto.getDefaultInstance();
+    return SubmitWorkResponseProto.getDefaultInstance();
   }
 
-
   @Override
   public void serviceStart() {
     Configuration conf = getConfig();

Added: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java?rev=1665307&view=auto
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
 (added)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
 Mon Mar  9 17:18:07 2015
@@ -0,0 +1,253 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.protobuf.ByteString;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UserPayloadProto;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.dag.api.EntityDescriptor;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+
+public class Converters {
+
+  public static TaskSpec getTaskSpecfromProto(FragmentSpecProto 
FragmentSpecProto) {
+    TezTaskAttemptID taskAttemptID =
+        
TezTaskAttemptID.fromString(FragmentSpecProto.getTaskAttemptIdString());
+
+    ProcessorDescriptor processorDescriptor = null;
+    if (FragmentSpecProto.hasProcessorDescriptor()) {
+      processorDescriptor = convertProcessorDescriptorFromProto(
+          FragmentSpecProto.getProcessorDescriptor());
+    }
+
+    List<InputSpec> inputSpecList = new 
ArrayList<InputSpec>(FragmentSpecProto.getInputSpecsCount());
+    if (FragmentSpecProto.getInputSpecsCount() > 0) {
+      for (IOSpecProto inputSpecProto : FragmentSpecProto.getInputSpecsList()) 
{
+        inputSpecList.add(getInputSpecFromProto(inputSpecProto));
+      }
+    }
+
+    List<OutputSpec> outputSpecList =
+        new ArrayList<OutputSpec>(FragmentSpecProto.getOutputSpecsCount());
+    if (FragmentSpecProto.getOutputSpecsCount() > 0) {
+      for (IOSpecProto outputSpecProto : 
FragmentSpecProto.getOutputSpecsList()) {
+        outputSpecList.add(getOutputSpecFromProto(outputSpecProto));
+      }
+    }
+
+    List<GroupInputSpec> groupInputSpecs =
+        new 
ArrayList<GroupInputSpec>(FragmentSpecProto.getGroupedInputSpecsCount());
+    if (FragmentSpecProto.getGroupedInputSpecsCount() > 0) {
+      for (GroupInputSpecProto groupInputSpecProto : 
FragmentSpecProto.getGroupedInputSpecsList()) {
+        groupInputSpecs.add(getGroupInputSpecFromProto(groupInputSpecProto));
+      }
+    }
+
+    TaskSpec taskSpec =
+        new TaskSpec(taskAttemptID, FragmentSpecProto.getDagName(), 
FragmentSpecProto.getVertexName(),
+            FragmentSpecProto.getVertexParallelism(), processorDescriptor, 
inputSpecList,
+            outputSpecList, groupInputSpecs);
+    return taskSpec;
+  }
+
+  public static FragmentSpecProto convertTaskSpecToProto(TaskSpec taskSpec) {
+    FragmentSpecProto.Builder builder = FragmentSpecProto.newBuilder();
+    builder.setTaskAttemptIdString(taskSpec.getTaskAttemptID().toString());
+    builder.setDagName(taskSpec.getDAGName());
+    builder.setVertexName(taskSpec.getVertexName());
+    builder.setVertexParallelism(taskSpec.getVertexParallelism());
+    builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId());
+    builder.setAttemptNumber(taskSpec.getTaskAttemptID().getId());
+
+    if (taskSpec.getProcessorDescriptor() != null) {
+      builder.setProcessorDescriptor(
+          convertToProto(taskSpec.getProcessorDescriptor()));
+    }
+
+    if (taskSpec.getInputs() != null && !taskSpec.getInputs().isEmpty()) {
+      for (InputSpec inputSpec : taskSpec.getInputs()) {
+        builder.addInputSpecs(convertInputSpecToProto(inputSpec));
+      }
+    }
+
+    if (taskSpec.getOutputs() != null && !taskSpec.getOutputs().isEmpty()) {
+      for (OutputSpec outputSpec : taskSpec.getOutputs()) {
+        builder.addOutputSpecs(convertOutputSpecToProto(outputSpec));
+      }
+    }
+
+    if (taskSpec.getGroupInputs() != null && 
!taskSpec.getGroupInputs().isEmpty()) {
+      for (GroupInputSpec groupInputSpec : taskSpec.getGroupInputs()) {
+        
builder.addGroupedInputSpecs(convertGroupInputSpecToProto(groupInputSpec));
+
+      }
+    }
+    return builder.build();
+  }
+
+  private static ProcessorDescriptor convertProcessorDescriptorFromProto(
+      EntityDescriptorProto proto) {
+    String className = proto.getClassName();
+    UserPayload payload = convertPayloadFromProto(proto);
+    ProcessorDescriptor pd = ProcessorDescriptor.create(className);
+    setUserPayload(pd, payload);
+    return pd;
+  }
+
+  private static EntityDescriptorProto convertToProto(
+      EntityDescriptor<?> descriptor) {
+    EntityDescriptorProto.Builder builder = EntityDescriptorProto
+        .newBuilder();
+    builder.setClassName(descriptor.getClassName());
+
+    UserPayload userPayload = descriptor.getUserPayload();
+    if (userPayload != null) {
+      UserPayloadProto.Builder payloadBuilder = UserPayloadProto.newBuilder();
+      if (userPayload.hasPayload()) {
+        
payloadBuilder.setUserPayload(ByteString.copyFrom(userPayload.getPayload()));
+        payloadBuilder.setVersion(userPayload.getVersion());
+      }
+      builder.setUserPayload(payloadBuilder.build());
+    }
+    if (descriptor.getHistoryText() != null) {
+      try {
+        builder.setHistoryText(TezCommonUtils.compressByteArrayToByteString(
+            descriptor.getHistoryText().getBytes("UTF-8")));
+      } catch (IOException e) {
+        throw new TezUncheckedException(e);
+      }
+    }
+    return builder.build();
+  }
+
+  private static InputSpec getInputSpecFromProto(IOSpecProto inputSpecProto) {
+    InputDescriptor inputDescriptor = null;
+    if (inputSpecProto.hasIoDescriptor()) {
+      inputDescriptor =
+          convertInputDescriptorFromProto(inputSpecProto.getIoDescriptor());
+    }
+    InputSpec inputSpec = new 
InputSpec(inputSpecProto.getConnectedVertexName(), inputDescriptor,
+        inputSpecProto.getPhysicalEdgeCount());
+    return inputSpec;
+  }
+
+  private static InputDescriptor convertInputDescriptorFromProto(
+      EntityDescriptorProto proto) {
+    String className = proto.getClassName();
+    UserPayload payload = convertPayloadFromProto(proto);
+    InputDescriptor id = InputDescriptor.create(className);
+    setUserPayload(id, payload);
+    return id;
+  }
+
+  private static OutputDescriptor convertOutputDescriptorFromProto(
+      EntityDescriptorProto proto) {
+    String className = proto.getClassName();
+    UserPayload payload = convertPayloadFromProto(proto);
+    OutputDescriptor od = OutputDescriptor.create(className);
+    setUserPayload(od, payload);
+    return od;
+  }
+
+  private static IOSpecProto convertInputSpecToProto(InputSpec inputSpec) {
+    IOSpecProto.Builder builder = IOSpecProto.newBuilder();
+    if (inputSpec.getSourceVertexName() != null) {
+      builder.setConnectedVertexName(inputSpec.getSourceVertexName());
+    }
+    if (inputSpec.getInputDescriptor() != null) {
+      builder.setIoDescriptor(convertToProto(inputSpec.getInputDescriptor()));
+    }
+    builder.setPhysicalEdgeCount(inputSpec.getPhysicalEdgeCount());
+    return builder.build();
+  }
+
+  private static OutputSpec getOutputSpecFromProto(IOSpecProto 
outputSpecProto) {
+    OutputDescriptor outputDescriptor = null;
+    if (outputSpecProto.hasIoDescriptor()) {
+      outputDescriptor =
+          convertOutputDescriptorFromProto(outputSpecProto.getIoDescriptor());
+    }
+    OutputSpec outputSpec =
+        new OutputSpec(outputSpecProto.getConnectedVertexName(), 
outputDescriptor,
+            outputSpecProto.getPhysicalEdgeCount());
+    return outputSpec;
+  }
+
+  public static IOSpecProto convertOutputSpecToProto(OutputSpec outputSpec) {
+    IOSpecProto.Builder builder = IOSpecProto.newBuilder();
+    if (outputSpec.getDestinationVertexName() != null) {
+      builder.setConnectedVertexName(outputSpec.getDestinationVertexName());
+    }
+    if (outputSpec.getOutputDescriptor() != null) {
+      
builder.setIoDescriptor(convertToProto(outputSpec.getOutputDescriptor()));
+    }
+    builder.setPhysicalEdgeCount(outputSpec.getPhysicalEdgeCount());
+    return builder.build();
+  }
+
+  private static GroupInputSpec getGroupInputSpecFromProto(GroupInputSpecProto 
groupInputSpecProto) {
+    GroupInputSpec groupSpec = new 
GroupInputSpec(groupInputSpecProto.getGroupName(),
+        groupInputSpecProto.getGroupVerticesList(),
+        
convertInputDescriptorFromProto(groupInputSpecProto.getMergedInputDescriptor()));
+    return groupSpec;
+  }
+
+  private static GroupInputSpecProto 
convertGroupInputSpecToProto(GroupInputSpec groupInputSpec) {
+    GroupInputSpecProto.Builder builder = GroupInputSpecProto.newBuilder();
+    builder.setGroupName(groupInputSpec.getGroupName());
+    builder.addAllGroupVertices(groupInputSpec.getGroupVertices());
+    
builder.setMergedInputDescriptor(convertToProto(groupInputSpec.getMergedInputDescriptor()));
+    return builder.build();
+  }
+
+
+  private static void setUserPayload(EntityDescriptor<?> entity, UserPayload 
payload) {
+    if (payload != null) {
+      entity.setUserPayload(payload);
+    }
+  }
+
+  private static UserPayload convertPayloadFromProto(
+      EntityDescriptorProto proto) {
+    UserPayload userPayload = null;
+    if (proto.hasUserPayload()) {
+      if (proto.getUserPayload().hasUserPayload()) {
+        userPayload =
+            
UserPayload.create(proto.getUserPayload().getUserPayload().asReadOnlyByteBuffer(),
 proto.getUserPayload().getVersion());
+      } else {
+        userPayload = UserPayload.create(null);
+      }
+    }
+    return userPayload;
+  }
+
+}

Added: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java?rev=1665307&view=auto
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
 (added)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
 Mon Mar  9 17:18:07 2015
@@ -0,0 +1,65 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.launcher.ContainerLauncher;
+import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+
+public class LlapContainerLauncher extends AbstractService implements 
ContainerLauncher {
+  static final Log LOG = LogFactory.getLog(LlapContainerLauncher.class);
+
+  private final AppContext context;
+  private final Clock clock;
+
+  public LlapContainerLauncher(AppContext appContext, Configuration conf,
+                               TaskAttemptListener tal) {
+    super(LlapContainerLauncher.class.getName());
+    this.context = appContext;
+    this.clock = appContext.getClock();
+  }
+
+  @Override
+  public void handle(NMCommunicatorEvent event) {
+    switch(event.getType()) {
+      case CONTAINER_LAUNCH_REQUEST:
+        final NMCommunicatorLaunchRequestEvent launchEvent = 
(NMCommunicatorLaunchRequestEvent) event;
+        LOG.info("No-op launch for container: " + launchEvent.getContainerId() 
+ " succeeded on host: " + launchEvent.getNodeId());
+        context.getEventHandler().handle(new 
AMContainerEventLaunched(launchEvent.getContainerId()));
+        ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
+            launchEvent.getContainerId(), clock.getTime(), 
context.getApplicationAttemptId());
+        context.getHistoryHandler().handle(new DAGHistoryEvent(
+            null, lEvt));
+        break;
+      case CONTAINER_STOP_REQUEST:
+        LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + event);
+        context.getEventHandler().handle(new 
AMContainerEvent(event.getContainerId(),
+            AMContainerEventType.C_NM_STOP_SENT));
+        break;
+    }
+  }
+}

Added: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java?rev=1665307&view=auto
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 (added)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 Mon Mar  9 17:18:07 2015
@@ -0,0 +1,185 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+
+public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
+
+  private static final Log LOG = LogFactory.getLog(LlapTaskCommunicator.class);
+
+  private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST;
+  private final ConcurrentMap<String, ByteBuffer> credentialMap;
+
+  private TaskCommunicator communicator;
+
+  public LlapTaskCommunicator(
+      TaskCommunicatorContext taskCommunicatorContext) {
+    super(taskCommunicatorContext);
+
+    SubmitWorkRequestProto.Builder baseBuilder = 
SubmitWorkRequestProto.newBuilder();
+
+    // TODO Avoid reading this from the environment
+    
baseBuilder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
+    baseBuilder.setApplicationIdString(
+        
taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString());
+    baseBuilder
+        
.setAppAttemptNumber(taskCommunicatorContext.getApplicationAttemptId().getAttemptId());
+    baseBuilder.setTokenIdentifier(getTokenIdentifier());
+
+    BASE_SUBMIT_WORK_REQUEST = baseBuilder.build();
+
+    credentialMap = new ConcurrentHashMap<String, ByteBuffer>();
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    int numThreads = 
conf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS,
+        LlapDaemonConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT);
+    this.communicator = new TaskCommunicator(numThreads);
+    this.communicator.init(conf);
+  }
+
+  @Override
+  public void serviceStart() {
+    super.serviceStart();
+    this.communicator.start();
+  }
+
+  @Override
+  public void serviceStop() {
+    super.serviceStop();
+    if (this.communicator != null) {
+      this.communicator.stop();
+    }
+  }
+
+
+  @Override
+  public void registerRunningContainer(ContainerId containerId, String 
hostname, int port) {
+    super.registerRunningContainer(containerId, hostname, port);
+  }
+
+  @Override
+  public void registerContainerEnd(ContainerId containerId) {
+    super.registerContainerEnd(containerId);
+  }
+
+  @Override
+  public void registerRunningTaskAttempt(final ContainerId containerId, final 
TaskSpec taskSpec,
+                                         Map<String, LocalResource> 
additionalResources,
+                                         Credentials credentials,
+                                         boolean credentialsChanged)  {
+    super.registerRunningTaskAttempt(containerId, taskSpec, 
additionalResources, credentials,
+        credentialsChanged);
+    SubmitWorkRequestProto requestProto = null;
+    try {
+      requestProto = constructSubmitWorkRequest(containerId, taskSpec);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to construct request", e);
+    }
+    ContainerInfo containerInfo = getContainerInfo(containerId);
+    String host;
+    int port;
+    if (containerInfo != null) {
+      synchronized (containerInfo) {
+        host = containerInfo.host;
+        port = containerInfo.port;
+      }
+    } else {
+      // TODO Handle this properly
+      throw new RuntimeException("ContainerInfo not found for container: " + 
containerId +
+          ", while trying to launch task: " + taskSpec.getTaskAttemptID());
+    }
+    communicator.submitWork(requestProto, host, port,
+        new TaskCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() 
{
+          @Override
+          public void setResponse(SubmitWorkResponseProto response) {
+            LOG.info("Successfully launched task: " + 
taskSpec.getTaskAttemptID());
+            getTaskCommunicatorContext()
+                .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
+          }
+
+          @Override
+          public void indicateError(Throwable t) {
+            // TODO Handle this error. This is where an API on the context to 
indicate failure / rejection comes in.
+            LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " 
on containerId: " +
+                containerId, t);
+          }
+        });
+  }
+
+  @Override
+  public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) {
+    super.unregisterRunningTaskAttempt(taskAttemptID);
+    // Nothing else to do for now. The push API in the test does not support 
termination of a running task
+  }
+
+  private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId 
containerId,
+                                                            TaskSpec taskSpec) 
throws
+      IOException {
+    SubmitWorkRequestProto.Builder builder =
+        SubmitWorkRequestProto.newBuilder(BASE_SUBMIT_WORK_REQUEST);
+    builder.setContainerIdString(containerId.toString());
+    builder.setAmHost(getAddress().getHostName());
+    builder.setAmPort(getAddress().getPort());
+    Credentials taskCredentials = new Credentials();
+    // Credentials can change across DAGs. Ideally construct only once per DAG.
+    taskCredentials.addAll(getTaskCommunicatorContext().getCredentials());
+
+    ByteBuffer credentialsBinary = credentialMap.get(taskSpec.getDAGName());
+    if (credentialsBinary == null) {
+      credentialsBinary = 
serializeCredentials(getTaskCommunicatorContext().getCredentials());
+      credentialMap.putIfAbsent(taskSpec.getDAGName(), 
credentialsBinary.duplicate());
+    } else {
+      credentialsBinary = credentialsBinary.duplicate();
+    }
+    builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
+    builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec));
+    return builder.build();
+  }
+
+  private ByteBuffer serializeCredentials(Credentials credentials) throws 
IOException {
+    Credentials containerCredentials = new Credentials();
+    containerCredentials.addAll(credentials);
+    DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+    containerCredentials.writeTokenStorageToStream(containerTokens_dob);
+    ByteBuffer containerCredentialsBuffer = 
ByteBuffer.wrap(containerTokens_dob.getData(), 0,
+        containerTokens_dob.getLength());
+    return containerCredentialsBuffer;
+  }
+}

Added: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java?rev=1665307&view=auto
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
 (added)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
 Mon Mar  9 17:18:07 2015
@@ -0,0 +1,111 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Message;
+import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemonProtocolClientImpl;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
+import org.apache.hadoop.service.AbstractService;
+
+public class TaskCommunicator extends AbstractService {
+
+  private final ConcurrentMap<String, LlapDaemonProtocolBlockingPB> 
hostProxies;
+  private ListeningExecutorService executor;
+
+  public TaskCommunicator(int numThreads) {
+    super(TaskCommunicator.class.getSimpleName());
+    ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads,
+        new ThreadFactoryBuilder().setNameFormat("TaskCommunicator 
#%2d").build());
+    this.hostProxies = new ConcurrentHashMap<>();
+    executor = MoreExecutors.listeningDecorator(localExecutor);
+  }
+
+  @Override
+  public void serviceStop() {
+    executor.shutdownNow();
+  }
+
+  public void submitWork(SubmitWorkRequestProto request, String host, int port,
+                         final ExecuteRequestCallback<SubmitWorkResponseProto> 
callback) {
+    ListenableFuture<SubmitWorkResponseProto> future = executor.submit(new 
SubmitWorkCallable(request, host, port));
+    Futures.addCallback(future, new FutureCallback<SubmitWorkResponseProto>() {
+      @Override
+      public void onSuccess(SubmitWorkResponseProto result) {
+        callback.setResponse(result);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        callback.indicateError(t);
+      }
+    });
+
+  }
+
+  private class SubmitWorkCallable implements 
Callable<SubmitWorkResponseProto> {
+    final String hostname;
+    final int port;
+    final SubmitWorkRequestProto request;
+
+    private SubmitWorkCallable(SubmitWorkRequestProto request, String 
hostname, int port) {
+      this.hostname = hostname;
+      this.port = port;
+      this.request = request;
+    }
+
+    @Override
+    public SubmitWorkResponseProto call() throws Exception {
+      return getProxy(hostname, port).submitWork(null, request);
+    }
+  }
+
+  public interface ExecuteRequestCallback<T extends Message> {
+    void setResponse(T response);
+    void indicateError(Throwable t);
+  }
+
+  private LlapDaemonProtocolBlockingPB getProxy(String hostname, int port) {
+    String hostId = getHostIdentifier(hostname, port);
+
+    LlapDaemonProtocolBlockingPB proxy = hostProxies.get(hostId);
+    if (proxy == null) {
+      proxy = new LlapDaemonProtocolClientImpl(getConfig(), hostname, port);
+      LlapDaemonProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, 
proxy);
+      if (proxyOld != null) {
+        // TODO Shutdown the new proxy.
+        proxy = proxyOld;
+      }
+    }
+    return proxy;
+  }
+
+  private String getHostIdentifier(String hostname, int port) {
+    return hostname + ":" + port;
+  }
+}

Added: 
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java?rev=1665307&view=auto
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
 (added)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
 Mon Mar  9 17:18:07 2015
@@ -0,0 +1,339 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration;
+import org.apache.tez.dag.app.AppContext;
+
+
+public class LlapTaskSchedulerService extends TaskSchedulerService {
+
+  private static final Log LOG = 
LogFactory.getLog(LlapTaskSchedulerService.class);
+
+  private final ExecutorService appCallbackExecutor;
+  private final TaskSchedulerAppCallback appClientDelegate;
+  private final AppContext appContext;
+  private final List<String> serviceHosts;
+  private final Set<String> serviceHostSet;
+  private final ContainerFactory containerFactory;
+  private final Random random = new Random();
+  private final int containerPort;
+
+  private final String clientHostname;
+  private final int clientPort;
+  private final String trackingUrl;
+  private final AtomicBoolean isStopped = new AtomicBoolean(false);
+  private final ConcurrentMap<Object, ContainerId> runningTasks =
+      new ConcurrentHashMap<Object, ContainerId>();
+
+  // Per daemon
+  private final int memoryPerInstance;
+  private final int coresPerInstance;
+  private final int executorsPerInstance;
+
+  // Per Executor Thread
+  private final Resource resourcePerExecutor;
+
+  // TODO: replace with service registry
+  private final YarnClient yc = YarnClient.createYarnClient();
+
+  public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, 
AppContext appContext,
+                                    String clientHostname, int clientPort, 
String trackingUrl,
+                                    long customAppIdIdentifier,
+                                    Configuration conf) {
+    // Accepting configuration here to allow setting up fields as final
+    super(LlapTaskSchedulerService.class.getName());
+    this.appCallbackExecutor = createAppCallbackExecutorService();
+    this.appClientDelegate = createAppCallbackDelegate(appClient);
+    this.appContext = appContext;
+    this.serviceHosts = new LinkedList<String>();
+    this.serviceHostSet = new HashSet<>();
+    this.containerFactory = new ContainerFactory(appContext, 
customAppIdIdentifier);
+    this.memoryPerInstance = conf
+        .getInt(LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
+            
LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT);
+    this.coresPerInstance = conf
+        .getInt(LlapDaemonConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE,
+            LlapDaemonConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE_DEFAULT);
+    this.executorsPerInstance = 
conf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
+        LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT);
+    this.clientHostname = clientHostname;
+    this.clientPort = clientPort;
+    this.trackingUrl = trackingUrl;
+
+    int memoryPerExecutor = (int) (memoryPerInstance / (float) 
executorsPerInstance);
+    int coresPerExecutor = (int) (coresPerInstance / (float) 
executorsPerInstance);
+    this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, 
coresPerExecutor);
+
+    String[] hosts = 
conf.getTrimmedStrings(LlapDaemonConfiguration.LLAP_DAEMON_SERVICE_HOSTS);
+    if (hosts == null || hosts.length == 0) {
+      hosts = new String[]{"localhost"};
+      serviceHosts.add("localhost");
+      serviceHostSet.add("localhost");
+    } else if (!hosts[0].equals("*")) {
+      for (String host : hosts) {
+        serviceHosts.add(host);
+        serviceHostSet.add(host);
+      }
+    }
+    this.containerPort = 
conf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT,
+        LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
+
+    if (serviceHosts.size() > 0) {
+      LOG.info("Running with configuration: " +
+          "memoryPerInstance=" + memoryPerInstance +
+          ", vcoresPerInstance=" + coresPerInstance +
+          ", executorsPerInstance=" + executorsPerInstance +
+          ", resourcePerInstanceInferred=" + resourcePerExecutor +
+          ", hosts=" + serviceHosts.toString() +
+          ", rpcPort=" + containerPort);
+    } else {
+      LOG.info("Running with configuration: " +
+          "memoryPerInstance=" + memoryPerInstance +
+          ", vcoresPerInstance=" + coresPerInstance +
+          ", executorsPerInstance=" + executorsPerInstance +
+          ", resourcePerInstanceInferred=" + resourcePerExecutor +
+          ", hosts=<pending>" +
+          ", rpcPort=<pending>");
+    }
+
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) {
+    yc.init(conf);
+  }
+
+
+  @Override
+  public void serviceStart() {
+    yc.start();
+    if (serviceHosts.size() > 0) {
+      return;
+    }
+    LOG.info("Evaluating host usage criteria for service nodes");
+    try {
+      List<NodeReport> nodes = yc.getNodeReports(NodeState.RUNNING);
+      for (NodeReport nd : nodes) {
+        Resource used = nd.getUsed();
+        LOG.info("Examining node: " + nd);
+        if (nd.getNodeState() == NodeState.RUNNING
+            && used.getMemory() >= memoryPerInstance) {
+          // TODO: fix this with YARN registry
+          serviceHosts.add(nd.getNodeId().getHost());
+          serviceHostSet.add(nd.getNodeId().getHost());
+        }
+      }
+      LOG.info("Re-inited with configuration: " +
+          "memoryPerInstance=" + memoryPerInstance +
+          ", vcoresPerInstance=" + coresPerInstance +
+          ", executorsPerInstance=" + executorsPerInstance +
+          ", resourcePerInstanceInferred=" + resourcePerExecutor +
+          ", hosts="+ serviceHosts.toString());
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (YarnException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void serviceStop() {
+    if (!this.isStopped.getAndSet(true)) {
+      appCallbackExecutor.shutdownNow();
+    }
+  }
+
+  @Override
+  public Resource getAvailableResources() {
+    // TODO This needs information about all running executors, and the amount 
of memory etc available across the cluster.
+    return Resource
+        .newInstance(Ints.checkedCast(serviceHosts.size() * memoryPerInstance),
+            serviceHosts.size() * coresPerInstance);
+  }
+
+  @Override
+  public int getClusterNodeCount() {
+    return serviceHosts.size();
+  }
+
+  @Override
+  public void resetMatchLocalityForAllHeldContainers() {
+  }
+
+  @Override
+  public Resource getTotalResources() {
+    return Resource
+        .newInstance(Ints.checkedCast(serviceHosts.size() * memoryPerInstance),
+            serviceHosts.size() * coresPerInstance);
+  }
+
+  @Override
+  public void blacklistNode(NodeId nodeId) {
+    LOG.info("DEBUG: BlacklistNode not supported");
+  }
+
+  @Override
+  public void unblacklistNode(NodeId nodeId) {
+    LOG.info("DEBUG: unBlacklistNode not supported");
+  }
+
+  @Override
+  public void allocateTask(Object task, Resource capability, String[] hosts, 
String[] racks,
+                           Priority priority, Object containerSignature, 
Object clientCookie) {
+    String host = selectHost(hosts);
+    Container container = 
containerFactory.createContainer(resourcePerExecutor, priority, host, 
containerPort);
+    runningTasks.put(task, container.getId());
+    appClientDelegate.taskAllocated(task, clientCookie, container);
+  }
+
+
+  @Override
+  public void allocateTask(Object task, Resource capability, ContainerId 
containerId,
+                           Priority priority, Object containerSignature, 
Object clientCookie) {
+    String host = selectHost(null);
+    Container container = 
containerFactory.createContainer(resourcePerExecutor, priority, host, 
containerPort);
+    runningTasks.put(task, container.getId());
+    appClientDelegate.taskAllocated(task, clientCookie, container);
+  }
+
+  @Override
+  public boolean deallocateTask(Object task, boolean taskSucceeded) {
+    ContainerId containerId = runningTasks.remove(task);
+    if (containerId == null) {
+      LOG.error("Could not determine ContainerId for task: " + task +
+          " . Could have hit a race condition. Ignoring." +
+          " The query may hang since this \"unknown\" container is now taking 
up a slot permanently");
+      return false;
+    }
+    appClientDelegate.containerBeingReleased(containerId);
+    return true;
+  }
+
+  @Override
+  public Object deallocateContainer(ContainerId containerId) {
+    LOG.info("DEBUG: Ignoring deallocateContainer for containerId: " + 
containerId);
+    return null;
+  }
+
+  @Override
+  public void setShouldUnregister() {
+
+  }
+
+  @Override
+  public boolean hasUnregistered() {
+    // Nothing to do. No registration involved.
+    return true;
+  }
+
+  private ExecutorService createAppCallbackExecutorService() {
+    return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+        .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
+  }
+
+  private TaskSchedulerAppCallback createAppCallbackDelegate(
+      TaskSchedulerAppCallback realAppClient) {
+    return new TaskSchedulerAppCallbackWrapper(realAppClient,
+        appCallbackExecutor);
+  }
+
+  private String selectHost(String[] requestedHosts) {
+    String host = null;
+    if (requestedHosts != null && requestedHosts.length > 0) {
+      Arrays.sort(requestedHosts);
+      host = requestedHosts[0];
+      if (serviceHostSet.contains(host)) {
+        LOG.info("Selected host: " + host + " from requested hosts: " + 
Arrays.toString(requestedHosts));
+      } else {
+        LOG.info("Preferred host: " + host + " not present. Attempting to 
select another one");
+        host = null;
+        for (String h : requestedHosts) {
+          if (serviceHostSet.contains(h)) {
+            host = h;
+            break;
+          }
+        }
+        if (host == null) {
+          LOG.info("Requested hosts: " + Arrays.toString(requestedHosts) + " 
not present. Randomizing the host");
+        }
+      }
+    }
+    if (host == null) {
+      host = serviceHosts.get(random.nextInt(serviceHosts.size()));
+      LOG.info("Selected random host: " + host + " since the request contained 
no host information");
+    }
+    return host;
+  }
+
+  static class ContainerFactory {
+    final ApplicationAttemptId customAppAttemptId;
+    AtomicInteger nextId;
+
+    public ContainerFactory(AppContext appContext, long appIdLong) {
+      this.nextId = new AtomicInteger(1);
+      ApplicationId appId = ApplicationId
+          .newInstance(appIdLong, 
appContext.getApplicationAttemptId().getApplicationId().getId());
+      this.customAppAttemptId = ApplicationAttemptId
+          .newInstance(appId, 
appContext.getApplicationAttemptId().getAttemptId());
+    }
+
+    public Container createContainer(Resource capability, Priority priority, 
String hostname, int port) {
+      ContainerId containerId = ContainerId.newInstance(customAppAttemptId, 
nextId.getAndIncrement());
+      NodeId nodeId = NodeId.newInstance(hostname, port);
+      String nodeHttpAddress = "hostname:0";
+
+      Container container = Container.newInstance(containerId,
+          nodeId,
+          nodeHttpAddress,
+          capability,
+          priority,
+          null);
+
+      return container;
+    }
+  }
+}

Modified: hive/branches/llap/llap-server/src/protobuf/LlapDaemonProtocol.proto
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/protobuf/LlapDaemonProtocol.proto?rev=1665307&r1=1665306&r2=1665307&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/protobuf/LlapDaemonProtocol.proto 
(original)
+++ hive/branches/llap/llap-server/src/protobuf/LlapDaemonProtocol.proto Mon 
Mar  9 17:18:07 2015
@@ -21,7 +21,46 @@ option java_outer_classname = "LlapDaemo
 option java_generic_services = true;
 option java_generate_equals_and_hash = true;
 
-message RunContainerRequestProto {
+// TODO Change this as the interface evolves. Currently using Tez constructs.
+
+message UserPayloadProto {
+  optional bytes user_payload = 1;
+  optional int32 version = 2;
+}
+
+message EntityDescriptorProto {
+  optional string class_name = 1;
+  optional UserPayloadProto user_payload = 2;
+  optional bytes history_text = 3;
+}
+
+message IOSpecProto {
+  optional string connected_vertex_name = 1;
+  optional EntityDescriptorProto io_descriptor = 2;
+  optional int32 physical_edge_count = 3;
+}
+
+message GroupInputSpecProto {
+  optional string group_name = 1;
+  repeated string group_vertices = 2;
+  optional EntityDescriptorProto merged_input_descriptor = 3;
+}
+
+
+message FragmentSpecProto {
+  optional string task_attempt_id_string = 1;
+  optional string dag_name = 2;
+  optional string vertex_name = 3;
+  optional EntityDescriptorProto processor_descriptor = 4;
+  repeated IOSpecProto input_specs = 5;
+  repeated IOSpecProto output_specs = 6;
+  repeated GroupInputSpecProto grouped_input_specs = 7;
+  optional int32 vertex_parallelism = 8;
+  optional int32 fragment_number =9;
+  optional int32 attempt_number = 10;
+}
+
+message SubmitWorkRequestProto {
   optional string container_id_string = 1;
   optional string am_host = 2;
   optional int32 am_port = 3;
@@ -30,11 +69,12 @@ message RunContainerRequestProto {
   optional string user = 6;
   optional string application_id_string = 7;
   optional int32 app_attempt_number = 8;
+  optional FragmentSpecProto fragment_spec = 9;
 }
 
-message RunContainerResponseProto {
+message SubmitWorkResponseProto {
 }
 
 service LlapDaemonProtocol {
-  rpc runContainer(RunContainerRequestProto) returns 
(RunContainerResponseProto);
+  rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto);
 }

Modified: 
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java?rev=1665307&r1=1665306&r2=1665307&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
 (original)
+++ 
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
 Mon Mar  9 17:18:07 2015
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration;
 import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import org.junit.Test;
 
 public class TestLlapDaemonProtocolServerImpl {
@@ -45,8 +45,9 @@ public class TestLlapDaemonProtocolServe
       LlapDaemonProtocolBlockingPB client =
           new LlapDaemonProtocolClientImpl(new Configuration(), 
serverAddr.getHostName(),
               serverAddr.getPort());
-      client.runContainer(null,
-          RunContainerRequestProto.newBuilder().setAmHost("amhost")
+      client.submitWork(null,
+          SubmitWorkRequestProto.newBuilder()
+              .setAmHost("amhost")
               .setAmPort(2000).build());
 
     } finally {

Modified: hive/branches/llap/llap-server/src/test/resources/llap-daemon-site.xml
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/resources/llap-daemon-site.xml?rev=1665307&r1=1665306&r2=1665307&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/test/resources/llap-daemon-site.xml 
(original)
+++ hive/branches/llap/llap-server/src/test/resources/llap-daemon-site.xml Mon 
Mar  9 17:18:07 2015
@@ -1,3 +1,18 @@
+<?xml version="1.0"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
 <configuration>
 
   <property>
@@ -24,6 +39,16 @@
   </property>
 
   <property>
+    <name>llap.daemon.communicator.num.threads</name>
+    <value>5</value>
+  </property>
+
+  <property>
+    <name>llap.daemon.rpc.port</name>
+    <value>15001</value>
+  </property>
+
+  <property>
     <name>llap.daemon.service.hosts</name>
     <value>localhost</value>
     <description>Comma separate list of nodes running daemons</description>

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1665307&r1=1665306&r2=1665307&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java 
(original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java 
Mon Mar  9 17:18:07 2015
@@ -629,7 +629,8 @@ public class DagUtils {
     map = Vertex.create(mapWork.getName(), 
ProcessorDescriptor.create(procClassName)
         .setUserPayload(serializedConf), numTasks, getContainerResource(conf))
       .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, serviceName)
-      .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, 
serviceName);
+      .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, 
serviceName)
+      .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, 
serviceName);
 
     map.setTaskEnvironment(getContainerEnvironment(conf, true));
     map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
@@ -700,7 +701,8 @@ public class DagUtils {
              reduceWork.getMaxReduceTasks():
              reduceWork.getNumReduceTasks(), getContainerResource(conf))
       .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, serviceName)
-      .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, 
serviceName);
+      .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, 
serviceName)
+      .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, 
serviceName);
 
     reducer.setTaskEnvironment(getContainerEnvironment(conf, false));
     reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1665307&r1=1665306&r2=1665307&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
 Mon Mar  9 17:18:07 2015
@@ -73,10 +73,12 @@ public class TezSessionState {
   public static final String LLAP_SERVICE = "LLAP";
   public static final String DEFAULT_SERVICE = 
TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT;
   public static final String LOCAL_SERVICE = 
TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT;
-  private static final String LLAP_SCHEDULER = 
"org.apache.tez.dag.app.rm.DaemonTaskSchedulerService";
-  private static final String LLAP_LAUNCHER = 
"org.apache.tez.dag.app.launcher.DaemonContainerLauncher";
+  private static final String LLAP_SCHEDULER = 
"org.apache.tez.dag.app.rm.LlapTaskSchedulerService";
+  private static final String LLAP_LAUNCHER = 
"org.apache.hadoop.hive.llap.tezplugins.LlapContainerLauncher";
+  private static final String LLAP_TASK_COMMUNICATOR = 
"org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator";
   private static final String LLAP_SERVICE_SCHEDULER = LLAP_SERVICE + ":" + 
LLAP_SCHEDULER;
   private static final String LLAP_SERVICE_LAUNCHER = LLAP_SERVICE + ":" + 
LLAP_LAUNCHER;
+  private static final String LLAP_SERVICE_TASK_COMMUNICATOR = LLAP_SERVICE + 
":" + LLAP_TASK_COMMUNICATOR;
 
   private HiveConf conf;
   private Path tezScratchDir;
@@ -209,7 +211,10 @@ public class TezSessionState {
         DEFAULT_SERVICE, LOCAL_SERVICE, LLAP_SERVICE_SCHEDULER);
 
     tezConfig.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS,
-       DEFAULT_SERVICE, LOCAL_SERVICE, LLAP_SERVICE_LAUNCHER);
+             DEFAULT_SERVICE, LOCAL_SERVICE, LLAP_SERVICE_LAUNCHER);
+
+    tezConfig.setStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS,
+        DEFAULT_SERVICE, LOCAL_SERVICE, LLAP_SERVICE_TASK_COMMUNICATOR);
 
     // container prewarming. tell the am how many containers we need
     if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) {


Reply via email to