Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java?rev=1665307&r1=1665306&r2=1665307&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java Mon Mar 9 17:18:07 2015 @@ -16,9 +16,9 @@ package org.apache.hadoop.hive.llap.daem import java.io.IOException; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; public interface ContainerRunner { - void queueContainer(LlapDaemonProtocolProtos.RunContainerRequestProto request) throws IOException; + void submitWork(SubmitWorkRequestProto request) throws IOException; }
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java?rev=1665307&r1=1665306&r2=1665307&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java Mon Mar 9 17:18:07 2015 @@ -29,6 +29,7 @@ public class HistoryLogger { private static final String HISTORY_VERTEX_NAME = "VertexName"; private static final String HISTORY_TASK_ID = "TaskId"; private static final String HISTORY_ATTEMPT_ID = "TaskAttemptId"; + private static final String HISTORY_THREAD_NAME = "ThreadName"; private static final String HISTORY_HOSTNAME = "HostName"; private static final String HISTORY_SUCCEEDED = "Succeeded"; @@ -48,9 +49,9 @@ public class HistoryLogger { public static void logFragmentEnd(String applicationIdStr, String containerIdStr, String hostname, String dagName, String vertexName, int taskId, int attemptId, - long startTime, boolean failed) { + String threadName, long startTime, boolean failed) { HISTORY_LOGGER.info(constructFragmentEndString(applicationIdStr, containerIdStr, hostname, - dagName, vertexName, taskId, attemptId, startTime, failed)); + dagName, vertexName, taskId, attemptId, threadName, startTime, failed)); } @@ -72,7 +73,7 @@ public class HistoryLogger { private static String constructFragmentEndString(String applicationIdStr, String containerIdStr, String hostname, String dagName, String vertexName, int taskId, int attemptId, - long startTime, boolean succeeded) { + String threadName, long startTime, boolean succeeded) { HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_END); lb.addHostName(hostname); lb.addAppid(applicationIdStr); @@ -81,6 +82,7 @@ public class HistoryLogger { lb.addVertexName(vertexName); lb.addTaskId(taskId); lb.addTaskAttemptId(attemptId); + lb.addThreadName(threadName); lb.addSuccessStatus(succeeded); lb.addTime(HISTORY_START_TIME, startTime); lb.addTime(HISTORY_END_TIME); @@ -122,6 +124,10 @@ public class HistoryLogger { return setKeyValue(HISTORY_ATTEMPT_ID, String.valueOf(attemptId)); } + HistoryLineBuilder addThreadName(String threadName) { + return setKeyValue(HISTORY_THREAD_NAME, threadName); + } + HistoryLineBuilder addTime(String timeParam, long millis) { return setKeyValue(timeParam, String.valueOf(millis)); } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java?rev=1665307&r1=1665306&r2=1665307&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java Mon Mar 9 17:18:07 2015 @@ -57,8 +57,9 @@ public class LlapDaemonConfiguration ext // Section for configs used in the AM // - public static final String LLAP_DAEMON_AM_SERVICE_HOSTS = LLAP_DAEMON_PREFIX + "service.hosts"; - public static final String LLAP_DAEMON_AM_COMMUNICATOR_NUM_THREADS = LLAP_DAEMON_PREFIX + "communicator.num.threads"; - public static final int LLAP_DAEMON_AM_COMMUNICATOR_NUM_THREADS_DEFAULT = 5; + public static final String LLAP_DAEMON_SERVICE_HOSTS = LLAP_DAEMON_PREFIX + "service.hosts"; + + public static final String LLAP_DAEMON_COMMUNICATOR_NUM_THREADS = LLAP_DAEMON_PREFIX + "communicator.num.threads"; + public static final int LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT = 5; } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java?rev=1665307&r1=1665306&r2=1665307&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java Mon Mar 9 17:18:07 2015 @@ -18,37 +18,50 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.CallableWithNdc; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; +import org.apache.hadoop.hive.llap.tezplugins.Converters; import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; import org.apache.log4j.Logger; import org.apache.log4j.NDC; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezException; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; -import org.apache.tez.runtime.task.TezChild; +import org.apache.tez.runtime.task.TaskReporter; import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult; import com.google.common.base.Preconditions; @@ -59,6 +72,7 @@ import com.google.common.util.concurrent import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.tez.runtime.task.TezTaskRunner; public class ContainerRunnerImpl extends AbstractService implements ContainerRunner { @@ -73,6 +87,7 @@ public class ContainerRunnerImpl extends private volatile FileSystem localFs; private final long memoryPerExecutor; private final LlapDaemonExecutorMetrics metrics; + private final ConfParams confParams; // TODO Support for removing queued containers, interrupting / killing specific containers public ContainerRunnerImpl(int numExecutors, String[] localDirsBase, int localShufflePort, @@ -83,6 +98,13 @@ public class ContainerRunnerImpl extends "Invalid number of executors: " + numExecutors + ". Must be > 0"); this.localDirsBase = localDirsBase; this.localAddress = localAddress; + this.confParams = new ConfParams(); + // Setup to defaults to start with + confParams.amMaxEventsPerHeartbeat = TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT; + confParams.amHeartbeatIntervalMsMax = + TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX_DEFAULT; + confParams.amCounterHeartbeatInterval = + TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT; ExecutorService raw = Executors.newFixedThreadPool(numExecutors, new ThreadFactoryBuilder().setNameFormat(THREAD_NAME_FORMAT).build()); @@ -104,6 +126,16 @@ public class ContainerRunnerImpl extends public void serviceInit(Configuration conf) { try { localFs = FileSystem.getLocal(conf); + // TODO Fix visibility of these parameters - which + confParams.amCounterHeartbeatInterval = conf.getLong( + TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS, + TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT); + confParams.amHeartbeatIntervalMsMax = + conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS, + TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT); + confParams.amMaxEventsPerHeartbeat = + conf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT, + TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT); } catch (IOException e) { throw new RuntimeException("Failed to setup local filesystem instance", e); } @@ -129,12 +161,21 @@ public class ContainerRunnerImpl extends } @Override - public void queueContainer(RunContainerRequestProto request) throws IOException { + public void submitWork(SubmitWorkRequestProto request) throws IOException { HistoryLogger.logFragmentStart(request.getApplicationIdString(), request.getContainerIdString(), - localAddress.get().getHostName(), null, null, -1, -1); + localAddress.get().getHostName(), request.getFragmentSpec().getDagName(), + request.getFragmentSpec().getVertexName(), request.getFragmentSpec().getFragmentNumber(), + request.getFragmentSpec().getAttemptNumber()); LOG.info("Queuing container for execution: " + request); // This is the start of container-annotated logging. - NDC.push(request.getContainerIdString()); + // TODO Reduce the length of this string. Way too verbose at the moment. + String ndcContextString = + request.getContainerIdString() + "_" + + request.getFragmentSpec().getDagName() + "_" + + request.getFragmentSpec().getVertexName() + + "_" + request.getFragmentSpec().getFragmentNumber() + "_" + + request.getFragmentSpec().getAttemptNumber(); + NDC.push(ndcContextString); try { Map<String, String> env = new HashMap<String, String>(); // TODO What else is required in this environment map. @@ -149,13 +190,14 @@ public class ContainerRunnerImpl extends request.getUser()); localFs.mkdirs(new Path(localDirs[i])); } - LOG.info("DEBUG: Dirs are: " + Arrays.toString(localDirs)); + // TODO Avoid this directory creation on each work-unit submission. + if (LOG.isDebugEnabled()) { + LOG.debug("Dirs are: " + Arrays.toString(localDirs)); + } // Setup workingDir. This is otherwise setup as Environment.PWD // Used for re-localization, to add the user specified configuration (conf_pb_binary_stream) - // TODO Set this up to read user configuration if required. Ideally, Inputs / Outputs should be self configured. - // Setting this up correctly is more from framework components to setup security, ping intervals, etc. String workingDir = localDirs[0]; Credentials credentials = new Credentials(); @@ -170,12 +212,11 @@ public class ContainerRunnerImpl extends LOG.info("DEBUG: Registering request with the ShuffleHandler"); ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken, request.getUser()); - ContainerRunnerCallable callable = new ContainerRunnerCallable(request, new Configuration(getConfig()), + TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(getConfig()), new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs, - workingDir, credentials, memoryPerExecutor, localAddress.get().getHostName()); - ListenableFuture<ContainerExecutionResult> future = executorService - .submit(callable); - Futures.addCallback(future, new ContainerRunnerCallback(request, callable)); + workingDir, credentials, memoryPerExecutor, confParams); + ListenableFuture<ContainerExecutionResult> future = executorService.submit(callable); + Futures.addCallback(future, new TaskRunnerCallback(request, callable)); metrics.incrExecutorTotalRequestsHandled(); metrics.incrExecutorNumQueuedRequests(); } finally { @@ -183,28 +224,31 @@ public class ContainerRunnerImpl extends } } - static class ContainerRunnerCallable extends CallableWithNdc<ContainerExecutionResult> { + static class TaskRunnerCallable extends CallableWithNdc<ContainerExecutionResult> { - private final RunContainerRequestProto request; + private final SubmitWorkRequestProto request; private final Configuration conf; private final String workingDir; private final String[] localDirs; private final Map<String, String> envMap; - // TODO Is a null pid valid - will this work with multiple different ResourceMonitors ? private final String pid = null; private final ObjectRegistryImpl objectRegistry; private final ExecutionContext executionContext; private final Credentials credentials; private final long memoryAvailable; - private volatile TezChild tezChild; - private final String localHostname; + private final ListeningExecutorService executor; + private final ConfParams confParams; + private volatile TezTaskRunner taskRunner; + private volatile TaskReporter taskReporter; + private TezTaskUmbilicalProtocol umbilical; private volatile long startTime; + private volatile String threadName; - ContainerRunnerCallable(RunContainerRequestProto request, Configuration conf, + TaskRunnerCallable(SubmitWorkRequestProto request, Configuration conf, ExecutionContext executionContext, Map<String, String> envMap, String[] localDirs, String workingDir, Credentials credentials, - long memoryAvailable, String localHostName) { + long memoryAvailable, ConfParams confParams) { this.request = request; this.conf = conf; this.executionContext = executionContext; @@ -214,40 +258,105 @@ public class ContainerRunnerImpl extends this.objectRegistry = new ObjectRegistryImpl(); this.credentials = credentials; this.memoryAvailable = memoryAvailable; - this.localHostname = localHostName; - + this.confParams = confParams; + // TODO This executor seems unnecessary. Here and TezChild + ExecutorService executorReal = Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat( + "TezTaskRunner_" + request.getFragmentSpec().getTaskAttemptIdString()) + .build()); + executor = MoreExecutors.listeningDecorator(executorReal); } @Override protected ContainerExecutionResult callInternal() throws Exception { this.startTime = System.currentTimeMillis(); + this.threadName = Thread.currentThread().getName(); + // TODO Consolidate this code with TezChild. Stopwatch sw = new Stopwatch().start(); - tezChild = - new TezChild(conf, request.getAmHost(), request.getAmPort(), - request.getContainerIdString(), - request.getTokenIdentifier(), request.getAppAttemptNumber(), workingDir, localDirs, - envMap, objectRegistry, pid, - executionContext, credentials, memoryAvailable, request.getUser(), null); - ContainerExecutionResult result = tezChild.run(); + UserGroupInformation taskUgi = UserGroupInformation.createRemoteUser(request.getUser()); + taskUgi.addCredentials(credentials); + + Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials); + Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>(); + serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, + TezCommonUtils.convertJobTokenToBytes(jobToken)); + Multimap<String, String> startedInputsMap = HashMultimap.create(); + + UserGroupInformation taskOwner = + UserGroupInformation.createRemoteUser(request.getTokenIdentifier()); + final InetSocketAddress address = + NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort()); + SecurityUtil.setTokenService(jobToken, address); + taskOwner.addToken(jobToken); + umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() { + @Override + public TezTaskUmbilicalProtocol run() throws Exception { + return RPC.getProxy(TezTaskUmbilicalProtocol.class, + TezTaskUmbilicalProtocol.versionID, address, conf); + } + }); + + taskReporter = new TaskReporter( + umbilical, + confParams.amHeartbeatIntervalMsMax, + confParams.amCounterHeartbeatInterval, + confParams.amMaxEventsPerHeartbeat, + new AtomicLong(0), + request.getContainerIdString()); + + taskRunner = new TezTaskRunner(conf, taskUgi, localDirs, + Converters.getTaskSpecfromProto(request.getFragmentSpec()), umbilical, + request.getAppAttemptNumber(), + serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, + pid, + executionContext, memoryAvailable); + + boolean shouldDie; + try { + shouldDie = !taskRunner.run(); + if (shouldDie) { + LOG.info("Got a shouldDie notification via heartbeats. Shutting down"); + return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null, + "Asked to die by the AM"); + } + } catch (IOException e) { + return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, + e, "TaskExecutionFailure: " + e.getMessage()); + } catch (TezException e) { + return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, + e, "TaskExecutionFailure: " + e.getMessage()); + } finally { + // TODO Fix UGI and FS Handling. Closing UGI here causes some errors right now. +// FileSystem.closeAllForUGI(taskUgi); + } LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + sw.stop().elapsedMillis()); - return result; + return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null, + null); } - public TezChild getTezChild() { - return this.tezChild; + public void shutdown() { + executor.shutdownNow(); + if (taskReporter != null) { + taskReporter.shutdown(); + } + if (umbilical != null) { + RPC.stopProxy(umbilical); + } } } - final class ContainerRunnerCallback implements FutureCallback<ContainerExecutionResult> { + final class TaskRunnerCallback implements FutureCallback<ContainerExecutionResult> { - private final RunContainerRequestProto request; - private final ContainerRunnerCallable containerRunnerCallable; + private final SubmitWorkRequestProto request; + private final TaskRunnerCallable taskRunnerCallable; - ContainerRunnerCallback(RunContainerRequestProto request, - ContainerRunnerCallable containerRunnerCallable) { + TaskRunnerCallback(SubmitWorkRequestProto request, + TaskRunnerCallable taskRunnerCallable) { this.request = request; - this.containerRunnerCallable = containerRunnerCallable; + this.taskRunnerCallable = taskRunnerCallable; } // TODO Slightly more useful error handling @@ -255,51 +364,62 @@ public class ContainerRunnerImpl extends public void onSuccess(ContainerExecutionResult result) { switch (result.getExitStatus()) { case SUCCESS: - LOG.info("Successfully finished: " + request.getApplicationIdString() + ", containerId=" + - request.getContainerIdString()); + LOG.info("Successfully finished: " + getTaskIdentifierString(request)); metrics.incrExecutorTotalSuccess(); break; case EXECUTION_FAILURE: - LOG.info("Failed to run: " + request.getApplicationIdString() + ", containerId=" + - request.getContainerIdString(), result.getThrowable()); + LOG.info("Failed to run: " + getTaskIdentifierString(request)); metrics.incrExecutorTotalExecutionFailed(); break; case INTERRUPTED: - LOG.info( - "Interrupted while running: " + request.getApplicationIdString() + ", containerId=" + - request.getContainerIdString(), result.getThrowable()); + LOG.info("Interrupted while running: " + getTaskIdentifierString(request)); metrics.incrExecutorTotalInterrupted(); break; case ASKED_TO_DIE: - LOG.info( - "Asked to die while running: " + request.getApplicationIdString() + ", containerId=" + - request.getContainerIdString()); + LOG.info("Asked to die while running: " + getTaskIdentifierString(request)); metrics.incrExecutorTotalAskedToDie(); break; } + taskRunnerCallable.shutdown(); HistoryLogger - .logFragmentEnd(request.getApplicationIdString(), - request.getContainerIdString(), - localAddress.get().getHostName(), null, null, -1, -1, - containerRunnerCallable.startTime, true); + .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(), + localAddress.get().getHostName(), request.getFragmentSpec().getDagName(), + request.getFragmentSpec().getVertexName(), + request.getFragmentSpec().getFragmentNumber(), + request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName, + taskRunnerCallable.startTime, true); metrics.decrExecutorNumQueuedRequests(); } @Override public void onFailure(Throwable t) { - LOG.error( - "TezChild execution failed for : " + request.getApplicationIdString() + ", containerId=" + - request.getContainerIdString()); - TezChild tezChild = containerRunnerCallable.getTezChild(); - if (tezChild != null) { - tezChild.shutdown(); - } + LOG.error("TezTaskRunner execution failed for : " + getTaskIdentifierString(request), t); + taskRunnerCallable.shutdown(); HistoryLogger - .logFragmentEnd(request.getApplicationIdString(), - request.getContainerIdString(), - localAddress.get().getHostName(), null, null, -1, -1, - containerRunnerCallable.startTime, false); + .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(), + localAddress.get().getHostName(), request.getFragmentSpec().getDagName(), + request.getFragmentSpec().getVertexName(), + request.getFragmentSpec().getFragmentNumber(), + request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName, + taskRunnerCallable.startTime, false); metrics.decrExecutorNumQueuedRequests(); } + + private String getTaskIdentifierString(SubmitWorkRequestProto request) { + StringBuilder sb = new StringBuilder(); + sb.append("AppId=").append(request.getApplicationIdString()) + .append(", containerId=").append(request.getContainerIdString()) + .append(", Dag=").append(request.getFragmentSpec().getDagName()) + .append(", Vertex=").append(request.getFragmentSpec().getVertexName()) + .append(", FragmentNum=").append(request.getFragmentSpec().getFragmentNumber()) + .append(", Attempt=").append(request.getFragmentSpec().getAttemptNumber()); + return sb.toString(); + } + } + + private static class ConfParams { + int amHeartbeatIntervalMsMax; + long amCounterHeartbeatInterval; + int amMaxEventsPerHeartbeat; } } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java?rev=1665307&r1=1665306&r2=1665307&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java Mon Mar 9 17:18:07 2015 @@ -24,7 +24,7 @@ import javax.management.ObjectName; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.io.api.LlapIoProxy; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; @@ -190,8 +190,9 @@ public class LlapDaemon extends Abstract } @Override - public void queueContainer(RunContainerRequestProto request) throws IOException { - containerRunner.queueContainer(request); + public void submitWork(LlapDaemonProtocolProtos.SubmitWorkRequestProto request) throws + IOException { + containerRunner.submitWork(request); } // LlapDaemonMXBean methods. Will be exposed via JMX @@ -224,4 +225,6 @@ public class LlapDaemon extends Abstract public long getMaxJvmMemory() { return maxJvmMemory; } + + } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java?rev=1665307&r1=1665306&r2=1665307&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java Mon Mar 9 17:18:07 2015 @@ -20,12 +20,11 @@ import java.net.InetSocketAddress; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerRequestProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerResponseProto; // TODO Change all this to be based on a regular interface instead of relying on the Proto service - Exception signatures cannot be controlled without this for the moment. @@ -43,17 +42,16 @@ public class LlapDaemonProtocolClientImp } @Override - public RunContainerResponseProto runContainer(RpcController controller, - RunContainerRequestProto request) throws + public LlapDaemonProtocolProtos.SubmitWorkResponseProto submitWork(RpcController controller, + LlapDaemonProtocolProtos.SubmitWorkRequestProto request) throws ServiceException { try { - return getProxy().runContainer(null, request); + return getProxy().submitWork(null, request); } catch (IOException e) { throw new ServiceException(e); } } - public LlapDaemonProtocolBlockingPB getProxy() throws IOException { if (proxy == null) { proxy = createProxy(); @@ -65,8 +63,7 @@ public class LlapDaemonProtocolClientImp LlapDaemonProtocolBlockingPB p; // TODO Fix security RPC.setProtocolEngine(conf, LlapDaemonProtocolBlockingPB.class, ProtobufRpcEngine.class); - p = (LlapDaemonProtocolBlockingPB) RPC - .getProxy(LlapDaemonProtocolBlockingPB.class, 0, serverAddr, conf); + p = RPC.getProxy(LlapDaemonProtocolBlockingPB.class, 0, serverAddr, conf); return p; } } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java?rev=1665307&r1=1665306&r2=1665307&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java Mon Mar 9 17:18:07 2015 @@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; @@ -34,8 +35,6 @@ import org.apache.hadoop.service.Abstrac import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration; import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerRequestProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerResponseProto; public class LlapDaemonProtocolServerImpl extends AbstractService implements LlapDaemonProtocolBlockingPB { @@ -58,19 +57,18 @@ public class LlapDaemonProtocolServerImp } @Override - public RunContainerResponseProto runContainer(RpcController controller, - RunContainerRequestProto request) throws + public SubmitWorkResponseProto submitWork(RpcController controller, + LlapDaemonProtocolProtos.SubmitWorkRequestProto request) throws ServiceException { - LOG.info("Received request: " + request); + LOG.info("DEBUG: Recevied request: " + request); try { - containerRunner.queueContainer(request); + containerRunner.submitWork(request); } catch (IOException e) { throw new ServiceException(e); } - return RunContainerResponseProto.getDefaultInstance(); + return SubmitWorkResponseProto.getDefaultInstance(); } - @Override public void serviceStart() { Configuration conf = getConfig(); Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java?rev=1665307&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java Mon Mar 9 17:18:07 2015 @@ -0,0 +1,253 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import com.google.protobuf.ByteString; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UserPayloadProto; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.dag.api.EntityDescriptor; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.OutputDescriptor; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.GroupInputSpec; +import org.apache.tez.runtime.api.impl.InputSpec; +import org.apache.tez.runtime.api.impl.OutputSpec; +import org.apache.tez.runtime.api.impl.TaskSpec; + +public class Converters { + + public static TaskSpec getTaskSpecfromProto(FragmentSpecProto FragmentSpecProto) { + TezTaskAttemptID taskAttemptID = + TezTaskAttemptID.fromString(FragmentSpecProto.getTaskAttemptIdString()); + + ProcessorDescriptor processorDescriptor = null; + if (FragmentSpecProto.hasProcessorDescriptor()) { + processorDescriptor = convertProcessorDescriptorFromProto( + FragmentSpecProto.getProcessorDescriptor()); + } + + List<InputSpec> inputSpecList = new ArrayList<InputSpec>(FragmentSpecProto.getInputSpecsCount()); + if (FragmentSpecProto.getInputSpecsCount() > 0) { + for (IOSpecProto inputSpecProto : FragmentSpecProto.getInputSpecsList()) { + inputSpecList.add(getInputSpecFromProto(inputSpecProto)); + } + } + + List<OutputSpec> outputSpecList = + new ArrayList<OutputSpec>(FragmentSpecProto.getOutputSpecsCount()); + if (FragmentSpecProto.getOutputSpecsCount() > 0) { + for (IOSpecProto outputSpecProto : FragmentSpecProto.getOutputSpecsList()) { + outputSpecList.add(getOutputSpecFromProto(outputSpecProto)); + } + } + + List<GroupInputSpec> groupInputSpecs = + new ArrayList<GroupInputSpec>(FragmentSpecProto.getGroupedInputSpecsCount()); + if (FragmentSpecProto.getGroupedInputSpecsCount() > 0) { + for (GroupInputSpecProto groupInputSpecProto : FragmentSpecProto.getGroupedInputSpecsList()) { + groupInputSpecs.add(getGroupInputSpecFromProto(groupInputSpecProto)); + } + } + + TaskSpec taskSpec = + new TaskSpec(taskAttemptID, FragmentSpecProto.getDagName(), FragmentSpecProto.getVertexName(), + FragmentSpecProto.getVertexParallelism(), processorDescriptor, inputSpecList, + outputSpecList, groupInputSpecs); + return taskSpec; + } + + public static FragmentSpecProto convertTaskSpecToProto(TaskSpec taskSpec) { + FragmentSpecProto.Builder builder = FragmentSpecProto.newBuilder(); + builder.setTaskAttemptIdString(taskSpec.getTaskAttemptID().toString()); + builder.setDagName(taskSpec.getDAGName()); + builder.setVertexName(taskSpec.getVertexName()); + builder.setVertexParallelism(taskSpec.getVertexParallelism()); + builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId()); + builder.setAttemptNumber(taskSpec.getTaskAttemptID().getId()); + + if (taskSpec.getProcessorDescriptor() != null) { + builder.setProcessorDescriptor( + convertToProto(taskSpec.getProcessorDescriptor())); + } + + if (taskSpec.getInputs() != null && !taskSpec.getInputs().isEmpty()) { + for (InputSpec inputSpec : taskSpec.getInputs()) { + builder.addInputSpecs(convertInputSpecToProto(inputSpec)); + } + } + + if (taskSpec.getOutputs() != null && !taskSpec.getOutputs().isEmpty()) { + for (OutputSpec outputSpec : taskSpec.getOutputs()) { + builder.addOutputSpecs(convertOutputSpecToProto(outputSpec)); + } + } + + if (taskSpec.getGroupInputs() != null && !taskSpec.getGroupInputs().isEmpty()) { + for (GroupInputSpec groupInputSpec : taskSpec.getGroupInputs()) { + builder.addGroupedInputSpecs(convertGroupInputSpecToProto(groupInputSpec)); + + } + } + return builder.build(); + } + + private static ProcessorDescriptor convertProcessorDescriptorFromProto( + EntityDescriptorProto proto) { + String className = proto.getClassName(); + UserPayload payload = convertPayloadFromProto(proto); + ProcessorDescriptor pd = ProcessorDescriptor.create(className); + setUserPayload(pd, payload); + return pd; + } + + private static EntityDescriptorProto convertToProto( + EntityDescriptor<?> descriptor) { + EntityDescriptorProto.Builder builder = EntityDescriptorProto + .newBuilder(); + builder.setClassName(descriptor.getClassName()); + + UserPayload userPayload = descriptor.getUserPayload(); + if (userPayload != null) { + UserPayloadProto.Builder payloadBuilder = UserPayloadProto.newBuilder(); + if (userPayload.hasPayload()) { + payloadBuilder.setUserPayload(ByteString.copyFrom(userPayload.getPayload())); + payloadBuilder.setVersion(userPayload.getVersion()); + } + builder.setUserPayload(payloadBuilder.build()); + } + if (descriptor.getHistoryText() != null) { + try { + builder.setHistoryText(TezCommonUtils.compressByteArrayToByteString( + descriptor.getHistoryText().getBytes("UTF-8"))); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + } + return builder.build(); + } + + private static InputSpec getInputSpecFromProto(IOSpecProto inputSpecProto) { + InputDescriptor inputDescriptor = null; + if (inputSpecProto.hasIoDescriptor()) { + inputDescriptor = + convertInputDescriptorFromProto(inputSpecProto.getIoDescriptor()); + } + InputSpec inputSpec = new InputSpec(inputSpecProto.getConnectedVertexName(), inputDescriptor, + inputSpecProto.getPhysicalEdgeCount()); + return inputSpec; + } + + private static InputDescriptor convertInputDescriptorFromProto( + EntityDescriptorProto proto) { + String className = proto.getClassName(); + UserPayload payload = convertPayloadFromProto(proto); + InputDescriptor id = InputDescriptor.create(className); + setUserPayload(id, payload); + return id; + } + + private static OutputDescriptor convertOutputDescriptorFromProto( + EntityDescriptorProto proto) { + String className = proto.getClassName(); + UserPayload payload = convertPayloadFromProto(proto); + OutputDescriptor od = OutputDescriptor.create(className); + setUserPayload(od, payload); + return od; + } + + private static IOSpecProto convertInputSpecToProto(InputSpec inputSpec) { + IOSpecProto.Builder builder = IOSpecProto.newBuilder(); + if (inputSpec.getSourceVertexName() != null) { + builder.setConnectedVertexName(inputSpec.getSourceVertexName()); + } + if (inputSpec.getInputDescriptor() != null) { + builder.setIoDescriptor(convertToProto(inputSpec.getInputDescriptor())); + } + builder.setPhysicalEdgeCount(inputSpec.getPhysicalEdgeCount()); + return builder.build(); + } + + private static OutputSpec getOutputSpecFromProto(IOSpecProto outputSpecProto) { + OutputDescriptor outputDescriptor = null; + if (outputSpecProto.hasIoDescriptor()) { + outputDescriptor = + convertOutputDescriptorFromProto(outputSpecProto.getIoDescriptor()); + } + OutputSpec outputSpec = + new OutputSpec(outputSpecProto.getConnectedVertexName(), outputDescriptor, + outputSpecProto.getPhysicalEdgeCount()); + return outputSpec; + } + + public static IOSpecProto convertOutputSpecToProto(OutputSpec outputSpec) { + IOSpecProto.Builder builder = IOSpecProto.newBuilder(); + if (outputSpec.getDestinationVertexName() != null) { + builder.setConnectedVertexName(outputSpec.getDestinationVertexName()); + } + if (outputSpec.getOutputDescriptor() != null) { + builder.setIoDescriptor(convertToProto(outputSpec.getOutputDescriptor())); + } + builder.setPhysicalEdgeCount(outputSpec.getPhysicalEdgeCount()); + return builder.build(); + } + + private static GroupInputSpec getGroupInputSpecFromProto(GroupInputSpecProto groupInputSpecProto) { + GroupInputSpec groupSpec = new GroupInputSpec(groupInputSpecProto.getGroupName(), + groupInputSpecProto.getGroupVerticesList(), + convertInputDescriptorFromProto(groupInputSpecProto.getMergedInputDescriptor())); + return groupSpec; + } + + private static GroupInputSpecProto convertGroupInputSpecToProto(GroupInputSpec groupInputSpec) { + GroupInputSpecProto.Builder builder = GroupInputSpecProto.newBuilder(); + builder.setGroupName(groupInputSpec.getGroupName()); + builder.addAllGroupVertices(groupInputSpec.getGroupVertices()); + builder.setMergedInputDescriptor(convertToProto(groupInputSpec.getMergedInputDescriptor())); + return builder.build(); + } + + + private static void setUserPayload(EntityDescriptor<?> entity, UserPayload payload) { + if (payload != null) { + entity.setUserPayload(payload); + } + } + + private static UserPayload convertPayloadFromProto( + EntityDescriptorProto proto) { + UserPayload userPayload = null; + if (proto.hasUserPayload()) { + if (proto.getUserPayload().hasUserPayload()) { + userPayload = + UserPayload.create(proto.getUserPayload().getUserPayload().asReadOnlyByteBuffer(), proto.getUserPayload().getVersion()); + } else { + userPayload = UserPayload.create(null); + } + } + return userPayload; + } + +} Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java?rev=1665307&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java Mon Mar 9 17:18:07 2015 @@ -0,0 +1,65 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.launcher.ContainerLauncher; +import org.apache.tez.dag.app.rm.NMCommunicatorEvent; +import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; +import org.apache.tez.dag.app.rm.container.AMContainerEvent; +import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched; +import org.apache.tez.dag.app.rm.container.AMContainerEventType; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.events.ContainerLaunchedEvent; + +public class LlapContainerLauncher extends AbstractService implements ContainerLauncher { + static final Log LOG = LogFactory.getLog(LlapContainerLauncher.class); + + private final AppContext context; + private final Clock clock; + + public LlapContainerLauncher(AppContext appContext, Configuration conf, + TaskAttemptListener tal) { + super(LlapContainerLauncher.class.getName()); + this.context = appContext; + this.clock = appContext.getClock(); + } + + @Override + public void handle(NMCommunicatorEvent event) { + switch(event.getType()) { + case CONTAINER_LAUNCH_REQUEST: + final NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event; + LOG.info("No-op launch for container: " + launchEvent.getContainerId() + " succeeded on host: " + launchEvent.getNodeId()); + context.getEventHandler().handle(new AMContainerEventLaunched(launchEvent.getContainerId())); + ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent( + launchEvent.getContainerId(), clock.getTime(), context.getApplicationAttemptId()); + context.getHistoryHandler().handle(new DAGHistoryEvent( + null, lEvt)); + break; + case CONTAINER_STOP_REQUEST: + LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + event); + context.getEventHandler().handle(new AMContainerEvent(event.getContainerId(), + AMContainerEventType.C_NM_STOP_SENT)); + break; + } + } +} Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java?rev=1665307&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java Mon Mar 9 17:18:07 2015 @@ -0,0 +1,185 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.google.protobuf.ByteString; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.dag.app.TezTaskCommunicatorImpl; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.TaskSpec; + +public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { + + private static final Log LOG = LogFactory.getLog(LlapTaskCommunicator.class); + + private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST; + private final ConcurrentMap<String, ByteBuffer> credentialMap; + + private TaskCommunicator communicator; + + public LlapTaskCommunicator( + TaskCommunicatorContext taskCommunicatorContext) { + super(taskCommunicatorContext); + + SubmitWorkRequestProto.Builder baseBuilder = SubmitWorkRequestProto.newBuilder(); + + // TODO Avoid reading this from the environment + baseBuilder.setUser(System.getenv(ApplicationConstants.Environment.USER.name())); + baseBuilder.setApplicationIdString( + taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString()); + baseBuilder + .setAppAttemptNumber(taskCommunicatorContext.getApplicationAttemptId().getAttemptId()); + baseBuilder.setTokenIdentifier(getTokenIdentifier()); + + BASE_SUBMIT_WORK_REQUEST = baseBuilder.build(); + + credentialMap = new ConcurrentHashMap<String, ByteBuffer>(); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + int numThreads = conf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS, + LlapDaemonConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT); + this.communicator = new TaskCommunicator(numThreads); + this.communicator.init(conf); + } + + @Override + public void serviceStart() { + super.serviceStart(); + this.communicator.start(); + } + + @Override + public void serviceStop() { + super.serviceStop(); + if (this.communicator != null) { + this.communicator.stop(); + } + } + + + @Override + public void registerRunningContainer(ContainerId containerId, String hostname, int port) { + super.registerRunningContainer(containerId, hostname, port); + } + + @Override + public void registerContainerEnd(ContainerId containerId) { + super.registerContainerEnd(containerId); + } + + @Override + public void registerRunningTaskAttempt(final ContainerId containerId, final TaskSpec taskSpec, + Map<String, LocalResource> additionalResources, + Credentials credentials, + boolean credentialsChanged) { + super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, + credentialsChanged); + SubmitWorkRequestProto requestProto = null; + try { + requestProto = constructSubmitWorkRequest(containerId, taskSpec); + } catch (IOException e) { + throw new RuntimeException("Failed to construct request", e); + } + ContainerInfo containerInfo = getContainerInfo(containerId); + String host; + int port; + if (containerInfo != null) { + synchronized (containerInfo) { + host = containerInfo.host; + port = containerInfo.port; + } + } else { + // TODO Handle this properly + throw new RuntimeException("ContainerInfo not found for container: " + containerId + + ", while trying to launch task: " + taskSpec.getTaskAttemptID()); + } + communicator.submitWork(requestProto, host, port, + new TaskCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() { + @Override + public void setResponse(SubmitWorkResponseProto response) { + LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID()); + getTaskCommunicatorContext() + .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId); + } + + @Override + public void indicateError(Throwable t) { + // TODO Handle this error. This is where an API on the context to indicate failure / rejection comes in. + LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + + containerId, t); + } + }); + } + + @Override + public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) { + super.unregisterRunningTaskAttempt(taskAttemptID); + // Nothing else to do for now. The push API in the test does not support termination of a running task + } + + private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerId, + TaskSpec taskSpec) throws + IOException { + SubmitWorkRequestProto.Builder builder = + SubmitWorkRequestProto.newBuilder(BASE_SUBMIT_WORK_REQUEST); + builder.setContainerIdString(containerId.toString()); + builder.setAmHost(getAddress().getHostName()); + builder.setAmPort(getAddress().getPort()); + Credentials taskCredentials = new Credentials(); + // Credentials can change across DAGs. Ideally construct only once per DAG. + taskCredentials.addAll(getTaskCommunicatorContext().getCredentials()); + + ByteBuffer credentialsBinary = credentialMap.get(taskSpec.getDAGName()); + if (credentialsBinary == null) { + credentialsBinary = serializeCredentials(getTaskCommunicatorContext().getCredentials()); + credentialMap.putIfAbsent(taskSpec.getDAGName(), credentialsBinary.duplicate()); + } else { + credentialsBinary = credentialsBinary.duplicate(); + } + builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary)); + builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec)); + return builder.build(); + } + + private ByteBuffer serializeCredentials(Credentials credentials) throws IOException { + Credentials containerCredentials = new Credentials(); + containerCredentials.addAll(credentials); + DataOutputBuffer containerTokens_dob = new DataOutputBuffer(); + containerCredentials.writeTokenStorageToStream(containerTokens_dob); + ByteBuffer containerCredentialsBuffer = ByteBuffer.wrap(containerTokens_dob.getData(), 0, + containerTokens_dob.getLength()); + return containerCredentialsBuffer; + } +} Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java?rev=1665307&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java Mon Mar 9 17:18:07 2015 @@ -0,0 +1,111 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins; + +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.Message; +import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; +import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemonProtocolClientImpl; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; +import org.apache.hadoop.service.AbstractService; + +public class TaskCommunicator extends AbstractService { + + private final ConcurrentMap<String, LlapDaemonProtocolBlockingPB> hostProxies; + private ListeningExecutorService executor; + + public TaskCommunicator(int numThreads) { + super(TaskCommunicator.class.getSimpleName()); + ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder().setNameFormat("TaskCommunicator #%2d").build()); + this.hostProxies = new ConcurrentHashMap<>(); + executor = MoreExecutors.listeningDecorator(localExecutor); + } + + @Override + public void serviceStop() { + executor.shutdownNow(); + } + + public void submitWork(SubmitWorkRequestProto request, String host, int port, + final ExecuteRequestCallback<SubmitWorkResponseProto> callback) { + ListenableFuture<SubmitWorkResponseProto> future = executor.submit(new SubmitWorkCallable(request, host, port)); + Futures.addCallback(future, new FutureCallback<SubmitWorkResponseProto>() { + @Override + public void onSuccess(SubmitWorkResponseProto result) { + callback.setResponse(result); + } + + @Override + public void onFailure(Throwable t) { + callback.indicateError(t); + } + }); + + } + + private class SubmitWorkCallable implements Callable<SubmitWorkResponseProto> { + final String hostname; + final int port; + final SubmitWorkRequestProto request; + + private SubmitWorkCallable(SubmitWorkRequestProto request, String hostname, int port) { + this.hostname = hostname; + this.port = port; + this.request = request; + } + + @Override + public SubmitWorkResponseProto call() throws Exception { + return getProxy(hostname, port).submitWork(null, request); + } + } + + public interface ExecuteRequestCallback<T extends Message> { + void setResponse(T response); + void indicateError(Throwable t); + } + + private LlapDaemonProtocolBlockingPB getProxy(String hostname, int port) { + String hostId = getHostIdentifier(hostname, port); + + LlapDaemonProtocolBlockingPB proxy = hostProxies.get(hostId); + if (proxy == null) { + proxy = new LlapDaemonProtocolClientImpl(getConfig(), hostname, port); + LlapDaemonProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, proxy); + if (proxyOld != null) { + // TODO Shutdown the new proxy. + proxy = proxyOld; + } + } + return proxy; + } + + private String getHostIdentifier(String hostname, int port) { + return hostname + ":" + port; + } +} Added: hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java?rev=1665307&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java Mon Mar 9 17:18:07 2015 @@ -0,0 +1,339 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.rm; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration; +import org.apache.tez.dag.app.AppContext; + + +public class LlapTaskSchedulerService extends TaskSchedulerService { + + private static final Log LOG = LogFactory.getLog(LlapTaskSchedulerService.class); + + private final ExecutorService appCallbackExecutor; + private final TaskSchedulerAppCallback appClientDelegate; + private final AppContext appContext; + private final List<String> serviceHosts; + private final Set<String> serviceHostSet; + private final ContainerFactory containerFactory; + private final Random random = new Random(); + private final int containerPort; + + private final String clientHostname; + private final int clientPort; + private final String trackingUrl; + private final AtomicBoolean isStopped = new AtomicBoolean(false); + private final ConcurrentMap<Object, ContainerId> runningTasks = + new ConcurrentHashMap<Object, ContainerId>(); + + // Per daemon + private final int memoryPerInstance; + private final int coresPerInstance; + private final int executorsPerInstance; + + // Per Executor Thread + private final Resource resourcePerExecutor; + + // TODO: replace with service registry + private final YarnClient yc = YarnClient.createYarnClient(); + + public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext appContext, + String clientHostname, int clientPort, String trackingUrl, + long customAppIdIdentifier, + Configuration conf) { + // Accepting configuration here to allow setting up fields as final + super(LlapTaskSchedulerService.class.getName()); + this.appCallbackExecutor = createAppCallbackExecutorService(); + this.appClientDelegate = createAppCallbackDelegate(appClient); + this.appContext = appContext; + this.serviceHosts = new LinkedList<String>(); + this.serviceHostSet = new HashSet<>(); + this.containerFactory = new ContainerFactory(appContext, customAppIdIdentifier); + this.memoryPerInstance = conf + .getInt(LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, + LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT); + this.coresPerInstance = conf + .getInt(LlapDaemonConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE, + LlapDaemonConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE_DEFAULT); + this.executorsPerInstance = conf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS, + LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT); + this.clientHostname = clientHostname; + this.clientPort = clientPort; + this.trackingUrl = trackingUrl; + + int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance); + int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance); + this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, coresPerExecutor); + + String[] hosts = conf.getTrimmedStrings(LlapDaemonConfiguration.LLAP_DAEMON_SERVICE_HOSTS); + if (hosts == null || hosts.length == 0) { + hosts = new String[]{"localhost"}; + serviceHosts.add("localhost"); + serviceHostSet.add("localhost"); + } else if (!hosts[0].equals("*")) { + for (String host : hosts) { + serviceHosts.add(host); + serviceHostSet.add(host); + } + } + this.containerPort = conf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT, + LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); + + if (serviceHosts.size() > 0) { + LOG.info("Running with configuration: " + + "memoryPerInstance=" + memoryPerInstance + + ", vcoresPerInstance=" + coresPerInstance + + ", executorsPerInstance=" + executorsPerInstance + + ", resourcePerInstanceInferred=" + resourcePerExecutor + + ", hosts=" + serviceHosts.toString() + + ", rpcPort=" + containerPort); + } else { + LOG.info("Running with configuration: " + + "memoryPerInstance=" + memoryPerInstance + + ", vcoresPerInstance=" + coresPerInstance + + ", executorsPerInstance=" + executorsPerInstance + + ", resourcePerInstanceInferred=" + resourcePerExecutor + + ", hosts=<pending>" + + ", rpcPort=<pending>"); + } + + } + + @Override + public void serviceInit(Configuration conf) { + yc.init(conf); + } + + + @Override + public void serviceStart() { + yc.start(); + if (serviceHosts.size() > 0) { + return; + } + LOG.info("Evaluating host usage criteria for service nodes"); + try { + List<NodeReport> nodes = yc.getNodeReports(NodeState.RUNNING); + for (NodeReport nd : nodes) { + Resource used = nd.getUsed(); + LOG.info("Examining node: " + nd); + if (nd.getNodeState() == NodeState.RUNNING + && used.getMemory() >= memoryPerInstance) { + // TODO: fix this with YARN registry + serviceHosts.add(nd.getNodeId().getHost()); + serviceHostSet.add(nd.getNodeId().getHost()); + } + } + LOG.info("Re-inited with configuration: " + + "memoryPerInstance=" + memoryPerInstance + + ", vcoresPerInstance=" + coresPerInstance + + ", executorsPerInstance=" + executorsPerInstance + + ", resourcePerInstanceInferred=" + resourcePerExecutor + + ", hosts="+ serviceHosts.toString()); + } catch (IOException e) { + e.printStackTrace(); + } catch (YarnException e) { + e.printStackTrace(); + } + } + + @Override + public void serviceStop() { + if (!this.isStopped.getAndSet(true)) { + appCallbackExecutor.shutdownNow(); + } + } + + @Override + public Resource getAvailableResources() { + // TODO This needs information about all running executors, and the amount of memory etc available across the cluster. + return Resource + .newInstance(Ints.checkedCast(serviceHosts.size() * memoryPerInstance), + serviceHosts.size() * coresPerInstance); + } + + @Override + public int getClusterNodeCount() { + return serviceHosts.size(); + } + + @Override + public void resetMatchLocalityForAllHeldContainers() { + } + + @Override + public Resource getTotalResources() { + return Resource + .newInstance(Ints.checkedCast(serviceHosts.size() * memoryPerInstance), + serviceHosts.size() * coresPerInstance); + } + + @Override + public void blacklistNode(NodeId nodeId) { + LOG.info("DEBUG: BlacklistNode not supported"); + } + + @Override + public void unblacklistNode(NodeId nodeId) { + LOG.info("DEBUG: unBlacklistNode not supported"); + } + + @Override + public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks, + Priority priority, Object containerSignature, Object clientCookie) { + String host = selectHost(hosts); + Container container = containerFactory.createContainer(resourcePerExecutor, priority, host, containerPort); + runningTasks.put(task, container.getId()); + appClientDelegate.taskAllocated(task, clientCookie, container); + } + + + @Override + public void allocateTask(Object task, Resource capability, ContainerId containerId, + Priority priority, Object containerSignature, Object clientCookie) { + String host = selectHost(null); + Container container = containerFactory.createContainer(resourcePerExecutor, priority, host, containerPort); + runningTasks.put(task, container.getId()); + appClientDelegate.taskAllocated(task, clientCookie, container); + } + + @Override + public boolean deallocateTask(Object task, boolean taskSucceeded) { + ContainerId containerId = runningTasks.remove(task); + if (containerId == null) { + LOG.error("Could not determine ContainerId for task: " + task + + " . Could have hit a race condition. Ignoring." + + " The query may hang since this \"unknown\" container is now taking up a slot permanently"); + return false; + } + appClientDelegate.containerBeingReleased(containerId); + return true; + } + + @Override + public Object deallocateContainer(ContainerId containerId) { + LOG.info("DEBUG: Ignoring deallocateContainer for containerId: " + containerId); + return null; + } + + @Override + public void setShouldUnregister() { + + } + + @Override + public boolean hasUnregistered() { + // Nothing to do. No registration involved. + return true; + } + + private ExecutorService createAppCallbackExecutorService() { + return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build()); + } + + private TaskSchedulerAppCallback createAppCallbackDelegate( + TaskSchedulerAppCallback realAppClient) { + return new TaskSchedulerAppCallbackWrapper(realAppClient, + appCallbackExecutor); + } + + private String selectHost(String[] requestedHosts) { + String host = null; + if (requestedHosts != null && requestedHosts.length > 0) { + Arrays.sort(requestedHosts); + host = requestedHosts[0]; + if (serviceHostSet.contains(host)) { + LOG.info("Selected host: " + host + " from requested hosts: " + Arrays.toString(requestedHosts)); + } else { + LOG.info("Preferred host: " + host + " not present. Attempting to select another one"); + host = null; + for (String h : requestedHosts) { + if (serviceHostSet.contains(h)) { + host = h; + break; + } + } + if (host == null) { + LOG.info("Requested hosts: " + Arrays.toString(requestedHosts) + " not present. Randomizing the host"); + } + } + } + if (host == null) { + host = serviceHosts.get(random.nextInt(serviceHosts.size())); + LOG.info("Selected random host: " + host + " since the request contained no host information"); + } + return host; + } + + static class ContainerFactory { + final ApplicationAttemptId customAppAttemptId; + AtomicInteger nextId; + + public ContainerFactory(AppContext appContext, long appIdLong) { + this.nextId = new AtomicInteger(1); + ApplicationId appId = ApplicationId + .newInstance(appIdLong, appContext.getApplicationAttemptId().getApplicationId().getId()); + this.customAppAttemptId = ApplicationAttemptId + .newInstance(appId, appContext.getApplicationAttemptId().getAttemptId()); + } + + public Container createContainer(Resource capability, Priority priority, String hostname, int port) { + ContainerId containerId = ContainerId.newInstance(customAppAttemptId, nextId.getAndIncrement()); + NodeId nodeId = NodeId.newInstance(hostname, port); + String nodeHttpAddress = "hostname:0"; + + Container container = Container.newInstance(containerId, + nodeId, + nodeHttpAddress, + capability, + priority, + null); + + return container; + } + } +} Modified: hive/branches/llap/llap-server/src/protobuf/LlapDaemonProtocol.proto URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/protobuf/LlapDaemonProtocol.proto?rev=1665307&r1=1665306&r2=1665307&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/protobuf/LlapDaemonProtocol.proto (original) +++ hive/branches/llap/llap-server/src/protobuf/LlapDaemonProtocol.proto Mon Mar 9 17:18:07 2015 @@ -21,7 +21,46 @@ option java_outer_classname = "LlapDaemo option java_generic_services = true; option java_generate_equals_and_hash = true; -message RunContainerRequestProto { +// TODO Change this as the interface evolves. Currently using Tez constructs. + +message UserPayloadProto { + optional bytes user_payload = 1; + optional int32 version = 2; +} + +message EntityDescriptorProto { + optional string class_name = 1; + optional UserPayloadProto user_payload = 2; + optional bytes history_text = 3; +} + +message IOSpecProto { + optional string connected_vertex_name = 1; + optional EntityDescriptorProto io_descriptor = 2; + optional int32 physical_edge_count = 3; +} + +message GroupInputSpecProto { + optional string group_name = 1; + repeated string group_vertices = 2; + optional EntityDescriptorProto merged_input_descriptor = 3; +} + + +message FragmentSpecProto { + optional string task_attempt_id_string = 1; + optional string dag_name = 2; + optional string vertex_name = 3; + optional EntityDescriptorProto processor_descriptor = 4; + repeated IOSpecProto input_specs = 5; + repeated IOSpecProto output_specs = 6; + repeated GroupInputSpecProto grouped_input_specs = 7; + optional int32 vertex_parallelism = 8; + optional int32 fragment_number =9; + optional int32 attempt_number = 10; +} + +message SubmitWorkRequestProto { optional string container_id_string = 1; optional string am_host = 2; optional int32 am_port = 3; @@ -30,11 +69,12 @@ message RunContainerRequestProto { optional string user = 6; optional string application_id_string = 7; optional int32 app_attempt_number = 8; + optional FragmentSpecProto fragment_spec = 9; } -message RunContainerResponseProto { +message SubmitWorkResponseProto { } service LlapDaemonProtocol { - rpc runContainer(RunContainerRequestProto) returns (RunContainerResponseProto); + rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto); } Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java?rev=1665307&r1=1665306&r2=1665307&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java (original) +++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java Mon Mar 9 17:18:07 2015 @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration; import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.junit.Test; public class TestLlapDaemonProtocolServerImpl { @@ -45,8 +45,9 @@ public class TestLlapDaemonProtocolServe LlapDaemonProtocolBlockingPB client = new LlapDaemonProtocolClientImpl(new Configuration(), serverAddr.getHostName(), serverAddr.getPort()); - client.runContainer(null, - RunContainerRequestProto.newBuilder().setAmHost("amhost") + client.submitWork(null, + SubmitWorkRequestProto.newBuilder() + .setAmHost("amhost") .setAmPort(2000).build()); } finally { Modified: hive/branches/llap/llap-server/src/test/resources/llap-daemon-site.xml URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/resources/llap-daemon-site.xml?rev=1665307&r1=1665306&r2=1665307&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/test/resources/llap-daemon-site.xml (original) +++ hive/branches/llap/llap-server/src/test/resources/llap-daemon-site.xml Mon Mar 9 17:18:07 2015 @@ -1,3 +1,18 @@ +<?xml version="1.0"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + <configuration> <property> @@ -24,6 +39,16 @@ </property> <property> + <name>llap.daemon.communicator.num.threads</name> + <value>5</value> + </property> + + <property> + <name>llap.daemon.rpc.port</name> + <value>15001</value> + </property> + + <property> <name>llap.daemon.service.hosts</name> <value>localhost</value> <description>Comma separate list of nodes running daemons</description> Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1665307&r1=1665306&r2=1665307&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Mon Mar 9 17:18:07 2015 @@ -629,7 +629,8 @@ public class DagUtils { map = Vertex.create(mapWork.getName(), ProcessorDescriptor.create(procClassName) .setUserPayload(serializedConf), numTasks, getContainerResource(conf)) .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, serviceName) - .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, serviceName); + .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, serviceName) + .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, serviceName); map.setTaskEnvironment(getContainerEnvironment(conf, true)); map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); @@ -700,7 +701,8 @@ public class DagUtils { reduceWork.getMaxReduceTasks(): reduceWork.getNumReduceTasks(), getContainerResource(conf)) .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, serviceName) - .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, serviceName); + .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, serviceName) + .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, serviceName); reducer.setTaskEnvironment(getContainerEnvironment(conf, false)); reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1665307&r1=1665306&r2=1665307&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Mon Mar 9 17:18:07 2015 @@ -73,10 +73,12 @@ public class TezSessionState { public static final String LLAP_SERVICE = "LLAP"; public static final String DEFAULT_SERVICE = TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT; public static final String LOCAL_SERVICE = TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT; - private static final String LLAP_SCHEDULER = "org.apache.tez.dag.app.rm.DaemonTaskSchedulerService"; - private static final String LLAP_LAUNCHER = "org.apache.tez.dag.app.launcher.DaemonContainerLauncher"; + private static final String LLAP_SCHEDULER = "org.apache.tez.dag.app.rm.LlapTaskSchedulerService"; + private static final String LLAP_LAUNCHER = "org.apache.hadoop.hive.llap.tezplugins.LlapContainerLauncher"; + private static final String LLAP_TASK_COMMUNICATOR = "org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator"; private static final String LLAP_SERVICE_SCHEDULER = LLAP_SERVICE + ":" + LLAP_SCHEDULER; private static final String LLAP_SERVICE_LAUNCHER = LLAP_SERVICE + ":" + LLAP_LAUNCHER; + private static final String LLAP_SERVICE_TASK_COMMUNICATOR = LLAP_SERVICE + ":" + LLAP_TASK_COMMUNICATOR; private HiveConf conf; private Path tezScratchDir; @@ -209,7 +211,10 @@ public class TezSessionState { DEFAULT_SERVICE, LOCAL_SERVICE, LLAP_SERVICE_SCHEDULER); tezConfig.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS, - DEFAULT_SERVICE, LOCAL_SERVICE, LLAP_SERVICE_LAUNCHER); + DEFAULT_SERVICE, LOCAL_SERVICE, LLAP_SERVICE_LAUNCHER); + + tezConfig.setStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS, + DEFAULT_SERVICE, LOCAL_SERVICE, LLAP_SERVICE_TASK_COMMUNICATOR); // container prewarming. tell the am how many containers we need if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) {