Repository: hive Updated Branches: refs/heads/llap 6547c0cec -> cae3ec16b
http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java index ca04557..515bf3c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java @@ -81,9 +81,6 @@ public class LlapNodeId { @Override public String toString() { - return "LlapNodeId{" + - "hostname='" + hostname + '\'' + - ", port=" + port + - '}'; + return hostname + ":" + port; } } http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java index 82f3b59..f3ce33b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java @@ -16,12 +16,19 @@ package org.apache.hadoop.hive.llap.daemon; import java.io.IOException; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; public interface ContainerRunner { void submitWork(SubmitWorkRequestProto request) throws IOException; void sourceStateUpdated(SourceStateUpdatedRequestProto request); + + void queryComplete(QueryCompleteRequestProto request); + + void terminateFragment(TerminateFragmentRequestProto request); } http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 061e875..c9e5829 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -14,7 +14,6 @@ package org.apache.hadoop.hive.llap.daemon.impl; -import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -26,19 +25,18 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; import org.apache.hadoop.io.DataInputBuffer; @@ -53,6 +51,7 @@ import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import com.google.common.base.Preconditions; @@ -64,11 +63,11 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun private static final Logger LOG = Logger.getLogger(ContainerRunnerImpl.class); private volatile AMReporter amReporter; + private final QueryTracker queryTracker; private final Scheduler<TaskRunnerCallable> executorService; private final AtomicReference<InetSocketAddress> localAddress; private final String[] localDirsBase; private final Map<String, String> localEnv = new HashMap<>(); - private final FileSystem localFs; private final long memoryPerExecutor; private final LlapDaemonExecutorMetrics metrics; private final Configuration conf; @@ -89,6 +88,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun this.localDirsBase = localDirsBase; this.localAddress = localAddress; + this.queryTracker = new QueryTracker(conf, localDirsBase); this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, enablePreemption); AuxiliaryServiceHelper.setServiceDataIntoEnv( TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, @@ -99,11 +99,6 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun this.memoryPerExecutor = (long)(totalMemoryAvailableBytes * 0.8 / (float) numExecutors); this.metrics = metrics; - try { - localFs = FileSystem.getLocal(conf); - } catch (IOException e) { - throw new RuntimeException("Failed to setup local filesystem instance", e); - } confParams = new TaskRunnerCallable.ConfParams( conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS, TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT), @@ -135,19 +130,10 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun amReporter.stop(); amReporter = null; } + queryTracker.shutdown(); super.serviceStop(); } - // TODO Move this into a utilities class - private static String createAppSpecificLocalDir(String baseDir, String applicationIdString, - String user) { - // TODO This is broken for secure clusters. The app will not have permission to create these directories. - // May work via Slider - since the directory would already exist. Otherwise may need a custom shuffle handler. - // TODO This should be the process user - and not the user on behalf of whom the query is being submitted. - return baseDir + File.separator + "usercache" + File.separator + user + File.separator + - "appcache" + File.separator + applicationIdString; - } - @Override public void submitWork(SubmitWorkRequestProto request) throws IOException { HistoryLogger.logFragmentStart(request.getApplicationIdString(), request.getContainerIdString(), @@ -170,15 +156,20 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun env.putAll(localEnv); env.put(ApplicationConstants.Environment.USER.name(), request.getUser()); - String[] localDirs = new String[localDirsBase.length]; + FragmentSpecProto fragmentSpec = request.getFragmentSpec(); + TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString( + fragmentSpec.getTaskAttemptIdString()); + int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId(); + + queryTracker + .registerFragment(null, request.getApplicationIdString(), fragmentSpec.getDagName(), + dagIdentifier, + fragmentSpec.getVertexName(), fragmentSpec.getFragmentNumber(), + fragmentSpec.getAttemptNumber(), request.getUser()); + + String []localDirs = queryTracker.getLocalDirs(null, fragmentSpec.getDagName(), request.getUser()); + Preconditions.checkNotNull(localDirs); - // Setup up local dirs to be application specific, and create them. - for (int i = 0; i < localDirsBase.length; i++) { - localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], request.getApplicationIdString(), - request.getUser()); - localFs.mkdirs(new Path(localDirs[i])); - } - // TODO Avoid this directory creation on each work-unit submission. if (LOG.isDebugEnabled()) { LOG.debug("Dirs are: " + Arrays.toString(localDirs)); } @@ -195,7 +186,9 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun // TODO Unregistering does not happen at the moment, since there's no signals on when an app completes. LOG.info("DEBUG: Registering request with the ShuffleHandler"); - ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken, request.getUser(), localDirs); + ShuffleHandler.get() + .registerDag(request.getApplicationIdString(), dagIdentifier, jobToken, + request.getUser(), localDirs); ConcurrentMap<String, SourceStateProto> sourceCompletionMap = getSourceCompletionMap(request.getFragmentSpec().getDagName()); TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(getConfig()), @@ -209,10 +202,6 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun } } - private void notifyAMOfRejection(TaskRunnerCallable callable) { - LOG.error("Notifying AM of request rejection is not implemented yet!"); - } - @Override public void sourceStateUpdated(SourceStateUpdatedRequestProto request) { LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request)); @@ -220,6 +209,21 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun dagMap.put(request.getSrcName(), request.getState()); } + @Override + public void queryComplete(QueryCompleteRequestProto request) { + queryTracker.queryComplete(null, request.getDagName(), request.getDeleteDelay()); + } + + @Override + public void terminateFragment(TerminateFragmentRequestProto request) { + // TODO Implement when this gets used. + } + + + private void notifyAMOfRejection(TaskRunnerCallable callable) { + LOG.error("Notifying AM of request rejection is not implemented yet!"); + } + private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) { StringBuilder sb = new StringBuilder(); sb.append("dagName=").append(request.getDagName()) http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 86b1f5c..fabacf7 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -27,7 +27,10 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; import org.apache.hadoop.hive.llap.io.api.LlapIoProxy; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; @@ -252,17 +255,27 @@ public class LlapDaemon extends AbstractService implements ContainerRunner, Llap } @Override - public void submitWork(LlapDaemonProtocolProtos.SubmitWorkRequestProto request) throws + public void submitWork(SubmitWorkRequestProto request) throws IOException { numSubmissions.incrementAndGet(); containerRunner.submitWork(request); } @Override - public void sourceStateUpdated(LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request) { + public void sourceStateUpdated(SourceStateUpdatedRequestProto request) { containerRunner.sourceStateUpdated(request); } + @Override + public void queryComplete(QueryCompleteRequestProto request) { + containerRunner.queryComplete(request); + } + + @Override + public void terminateFragment(TerminateFragmentRequestProto request) { + containerRunner.terminateFragment(request); + } + @VisibleForTesting public long getNumSubmissions() { return numSubmissions.get(); http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java index 01b53c2..9f161fe 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java @@ -20,10 +20,15 @@ import java.net.InetSocketAddress; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; @@ -66,6 +71,28 @@ public class LlapDaemonProtocolClientImpl implements LlapDaemonProtocolBlockingP } } + @Override + public QueryCompleteResponseProto queryComplete(RpcController controller, + QueryCompleteRequestProto request) throws + ServiceException { + try { + return getProxy().queryComplete(null, request); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public TerminateFragmentResponseProto terminateFragment( + RpcController controller, + TerminateFragmentRequestProto request) throws ServiceException { + try { + return getProxy().terminateFragment(null, request); + } catch (IOException e) { + throw new ServiceException(e); + } + } + public LlapDaemonProtocolBlockingPB getProxy() throws IOException { if (proxy == null) { proxy = createProxy(); http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java index 0360a27..8cb9715 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java @@ -27,10 +27,14 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; @@ -84,6 +88,22 @@ public class LlapDaemonProtocolServerImpl extends AbstractService } @Override + public QueryCompleteResponseProto queryComplete(RpcController controller, + QueryCompleteRequestProto request) throws + ServiceException { + containerRunner.queryComplete(request); + return QueryCompleteResponseProto.getDefaultInstance(); + } + + @Override + public TerminateFragmentResponseProto terminateFragment( + RpcController controller, + TerminateFragmentRequestProto request) throws ServiceException { + containerRunner.terminateFragment(request); + return TerminateFragmentResponseProto.getDefaultInstance(); + } + + @Override public void serviceStart() { Configuration conf = getConfig(); http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java new file mode 100644 index 0000000..bc18a77 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java @@ -0,0 +1,94 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.impl; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.service.AbstractService; +import org.apache.tez.common.CallableWithNdc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QueryFileCleaner extends AbstractService { + + private static final Logger LOG = LoggerFactory.getLogger(QueryFileCleaner.class); + + private final ListeningScheduledExecutorService executorService; + private final FileSystem localFs; + + + public QueryFileCleaner(Configuration conf, FileSystem localFs) { + super(QueryFileCleaner.class.getName()); + int numCleanerThreads = conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS, + LlapConfiguration.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS_DEFAULT); + ScheduledExecutorService rawExecutor = Executors.newScheduledThreadPool(numCleanerThreads, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryFileCleaner %d").build()); + this.executorService = MoreExecutors.listeningDecorator(rawExecutor); + this.localFs = localFs; + } + + public void serviceStart() { + LOG.info(getName() + " started"); + } + + @Override + public void serviceStop() { + executorService.shutdownNow(); + LOG.info(getName() + " stopped"); + } + + public void cleanupDir(String dir, long deleteDelay) { + LOG.info("Scheduling deletion of {} after {} seconds", dir, deleteDelay); + executorService.schedule(new FileCleanerCallable(dir), deleteDelay, TimeUnit.SECONDS); + } + + private class FileCleanerCallable extends CallableWithNdc<Void> { + + private final String dirToDelete; + + private FileCleanerCallable(String dirToDelete) { + this.dirToDelete = dirToDelete; + } + + @Override + protected Void callInternal() { + Path pathToDelete = new Path(dirToDelete); + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting path: " + pathToDelete); + } + try { + localFs.delete(new Path(dirToDelete), true); + } catch (IOException e) { + LOG.warn("Ignoring exception while cleaning up path: " + pathToDelete, e); + } + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java new file mode 100644 index 0000000..16d745b --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -0,0 +1,143 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Tracks queries running within a daemon + */ +public class QueryTracker { + + private static final Logger LOG = LoggerFactory.getLogger(QueryTracker.class); + private final QueryFileCleaner queryFileCleaner; + + // TODO Make use if the query id for cachin when this is available. + private final ConcurrentHashMap<String, QueryInfo> queryInfoMap = new ConcurrentHashMap<>(); + + private final String[] localDirsBase; + private final FileSystem localFs; + + public QueryTracker(Configuration conf, String[] localDirsBase) { + this.localDirsBase = localDirsBase; + try { + localFs = FileSystem.getLocal(conf); + } catch (IOException e) { + throw new RuntimeException("Failed to setup local filesystem instance", e); + } + queryFileCleaner = new QueryFileCleaner(conf, localFs); + queryFileCleaner.init(conf); + queryFileCleaner.start(); + } + + + void registerFragment(String queryId, String appIdString, String dagName, int dagIdentifier, + String vertexName, int fragmentNumber, int attemptNumber, + String user) throws + IOException { + QueryInfo queryInfo = queryInfoMap.get(dagName); + if (queryInfo == null) { + queryInfo = new QueryInfo(queryId, appIdString, dagName, dagIdentifier, user); + queryInfoMap.putIfAbsent(dagName, queryInfo); + } + } + + String[] getLocalDirs(String queryId, String dagName, String user) throws IOException { + QueryInfo queryInfo = queryInfoMap.get(dagName); + return queryInfo.getLocalDirs(); + } + + void queryComplete(String queryId, String dagName, long deleteDelay) { + LOG.info("Processing queryComplete for dagName={} with deleteDelay={} seconds", dagName, deleteDelay); + QueryInfo queryInfo = queryInfoMap.remove(dagName); + if (queryInfo == null) { + LOG.warn("Ignoring query complete for unknown dag: {}", dagName); + } + String []localDirs = queryInfo.getLocalDirsNoCreate(); + if (localDirs != null) { + for (String localDir : localDirs) { + queryFileCleaner.cleanupDir(localDir, deleteDelay); + ShuffleHandler.get().unregisterDag(localDir, dagName, queryInfo.dagIdentifier); + } + } + // TODO HIVE-10535 Cleanup map join cache + } + + void shutdown() { + queryFileCleaner.stop(); + } + + + private class QueryInfo { + + private final String queryId; + private final String appIdString; + private final String dagName; + private final int dagIdentifier; + private final String user; + private String[] localDirs; + + public QueryInfo(String queryId, String appIdString, String dagName, int dagIdentifier, + String user) { + this.queryId = queryId; + this.appIdString = appIdString; + this.dagName = dagName; + this.dagIdentifier = dagIdentifier; + this.user = user; + } + + + + + private synchronized void createLocalDirs() throws IOException { + if (localDirs == null) { + localDirs = new String[localDirsBase.length]; + for (int i = 0; i < localDirsBase.length; i++) { + localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], appIdString, user, dagIdentifier); + localFs.mkdirs(new Path(localDirs[i])); + } + } + } + + private synchronized String[] getLocalDirs() throws IOException { + if (localDirs == null) { + createLocalDirs(); + } + return localDirs; + } + + private synchronized String[] getLocalDirsNoCreate() { + return this.localDirs; + } + } + + private static String createAppSpecificLocalDir(String baseDir, String applicationIdString, + String user, int dagIdentifier) { + // TODO This is broken for secure clusters. The app will not have permission to create these directories. + // May work via Slider - since the directory would already exist. Otherwise may need a custom shuffle handler. + // TODO This should be the process user - and not the user on behalf of whom the query is being submitted. + return baseDir + File.separator + "usercache" + File.separator + user + File.separator + + "appcache" + File.separator + applicationIdString + File.separator + dagIdentifier; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/DirWatcher.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/DirWatcher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/DirWatcher.java index 08e4787..b1d2cf7 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/DirWatcher.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/DirWatcher.java @@ -50,7 +50,7 @@ class DirWatcher { private static final Log LOG = LogFactory.getLog(DirWatcher.class); private static enum Type { - BASE, // App Base Dir + BASE, // App Base Dir / ${dagDir} OUTPUT, // appBase/output/ FINAL, // appBase/output/attemptDir } @@ -95,9 +95,12 @@ class DirWatcher { * @param expiry when to expire the watch - in ms * @throws IOException */ - void registerApplicationDir(String pathString, String appId, String user, long expiry) throws IOException { + void registerDagDir(String pathString, String appId, int dagIdentifier, String user, long expiry) throws IOException { + // The path string contains the dag Identifier Path path = FileSystems.getDefault().getPath(pathString); - WatchedPathInfo watchedPathInfo = new WatchedPathInfo(System.currentTimeMillis() + expiry, Type.BASE, appId, user); + WatchedPathInfo watchedPathInfo = + new WatchedPathInfo(System.currentTimeMillis() + expiry, Type.BASE, appId, dagIdentifier, + user); watchedPaths.put(path, watchedPathInfo); WatchKey watchKey = path.register(watchService, ENTRY_CREATE); watchedPathInfo.setWatchKey(watchKey); @@ -106,6 +109,10 @@ class DirWatcher { // TODO Watches on the output dirs need to be cancelled at some point. For now - via the expiry. } + void unregisterDagDir(String pathString, String appId, int dagIdentifier) { + // TODO Implement to remove all watches for the specified pathString and it's sub-tree + } + /** * Invoke when a pathIdentifier has been found, or is no longer of interest * @param pathIdentifier @@ -226,7 +233,7 @@ class DirWatcher { cancelledWatch = true; watchKey.cancel(); } else { - LOG.warn("DEBUG: Found unexpected directory: " + event.context() + " under " + watchedPath); + LOG.warn("DEBUG: Found unexpected directory while looking for OUTPUT: " + event.context() + " under " + watchedPath); } break; case OUTPUT: @@ -349,15 +356,17 @@ class DirWatcher { final long expiry; final Type type; final String appId; + final int dagId; final String user; final String attemptId; final AttemptPathIdentifier pathIdentifier; WatchKey watchKey; - public WatchedPathInfo(long expiry, Type type, String jobId, String user) { + public WatchedPathInfo(long expiry, Type type, String jobId, int dagId, String user) { this.expiry = expiry; this.type = type; this.appId = jobId; + this.dagId = dagId; this.user = user; this.attemptId = null; this.pathIdentifier = null; @@ -367,10 +376,11 @@ class DirWatcher { this.expiry = other.expiry; this.appId = other.appId; this.user = other.user; + this.dagId = other.dagId; this.type = type; this.attemptId = attemptId; if (attemptId != null) { - pathIdentifier = new AttemptPathIdentifier(appId, user, attemptId); + pathIdentifier = new AttemptPathIdentifier(appId, dagId, user, attemptId); } else { pathIdentifier = null; } http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java index d640b36..2572c75 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java @@ -408,11 +408,13 @@ public class ShuffleHandler implements AttemptRegistrationListener { /** * Register an application and it's associated credentials and user information. * @param applicationIdString + * @param dagIdentifier * @param appToken * @param user */ - public void registerApplication(String applicationIdString, Token<JobTokenIdentifier> appToken, - String user, String [] appDirs) { + public void registerDag(String applicationIdString, int dagIdentifier, + Token<JobTokenIdentifier> appToken, + String user, String[] appDirs) { // TODO Fix this. There's a race here, where an app may think everything is registered, finish really fast, send events and the consumer will not find the registration. Boolean registered = registeredApps.putIfAbsent(applicationIdString, Boolean.valueOf(true)); if (registered == null) { @@ -421,7 +423,8 @@ public class ShuffleHandler implements AttemptRegistrationListener { if (dirWatcher != null) { for (String appDir : appDirs) { try { - dirWatcher.registerApplicationDir(appDir, applicationIdString, user, 5 * 60 * 1000); + dirWatcher.registerDagDir(appDir, applicationIdString, dagIdentifier, user, + 5 * 60 * 1000); } catch (IOException e) { LOG.warn("Unable to register dir: " + appDir + " with watcher"); } @@ -430,9 +433,13 @@ public class ShuffleHandler implements AttemptRegistrationListener { } } + public void unregisterDag(String dir, String applicationIdString, int dagIdentifier) { + dirWatcher.unregisterDagDir(dir, applicationIdString, dagIdentifier); + // TODO Cleanup registered tokens and dag info + } + public void unregisterApplication(String applicationIdString) { removeJobShuffleInfo(applicationIdString); - // TOOD Unregister from the dirWatcher } @@ -546,7 +553,7 @@ public class ShuffleHandler implements AttemptRegistrationListener { @Override public AttemptPathInfo load(AttemptPathIdentifier key) throws Exception { - String base = getBaseLocation(key.jobId, key.user); + String base = getBaseLocation(key.jobId, key.dagId, key.user); String attemptBase = base + key.attemptId; Path indexFileName = lDirAlloc.getLocalPathToRead(attemptBase + "/" + INDEX_FILE_NAME, conf); @@ -633,27 +640,31 @@ public class ShuffleHandler implements AttemptRegistrationListener { final List<String> mapIds = splitMaps(q.get("map")); final List<String> reduceQ = q.get("reduce"); final List<String> jobQ = q.get("job"); + final List<String> dagIdQ = q.get("dag"); if (LOG.isDebugEnabled()) { LOG.debug("RECV: " + request.getUri() + "\n mapId: " + mapIds + "\n reduceId: " + reduceQ + "\n jobId: " + jobQ + + "\n dagId: " + dagIdQ + "\n keepAlive: " + keepAliveParam); } - if (mapIds == null || reduceQ == null || jobQ == null) { + if (mapIds == null || reduceQ == null || jobQ == null | dagIdQ == null) { sendError(ctx, "Required param job, map and reduce", BAD_REQUEST); return; } - if (reduceQ.size() != 1 || jobQ.size() != 1) { + if (reduceQ.size() != 1 || jobQ.size() != 1 || dagIdQ.size() != 1) { sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST); return; } int reduceId; String jobId; + int dagId; try { reduceId = Integer.parseInt(reduceQ.get(0)); jobId = jobQ.get(0); + dagId = Integer.parseInt(dagIdQ.get(0)); } catch (NumberFormatException e) { sendError(ctx, "Bad reduce parameter", BAD_REQUEST); return; @@ -683,7 +694,7 @@ public class ShuffleHandler implements AttemptRegistrationListener { String user = userRsrc.get(jobId); try { - populateHeaders(mapIds, jobId, user, reduceId, + populateHeaders(mapIds, jobId, dagId, user, reduceId, response, keepAliveParam, mapOutputInfoMap); } catch(IOException e) { ch.write(response); @@ -701,7 +712,7 @@ public class ShuffleHandler implements AttemptRegistrationListener { // This will be hit if there's a large number of mapIds in a single request // (Determined by the cache size further up), in which case we go to disk again. if (info == null) { - info = getMapOutputInfo(jobId, mapId, reduceId, user); + info = getMapOutputInfo(jobId, dagId, mapId, reduceId, user); } lastMap = sendMapOutput(ctx, ch, user, mapId, @@ -730,11 +741,11 @@ public class ShuffleHandler implements AttemptRegistrationListener { } - protected MapOutputInfo getMapOutputInfo(String jobId, String mapId, + protected MapOutputInfo getMapOutputInfo(String jobId, int dagId, String mapId, int reduce, String user) throws IOException { AttemptPathInfo pathInfo; try { - AttemptPathIdentifier identifier = new AttemptPathIdentifier(jobId, user, mapId); + AttemptPathIdentifier identifier = new AttemptPathIdentifier(jobId, dagId, user, mapId); pathInfo = pathCache.get(identifier); LOG.info("DEBUG: Retrieved pathInfo for " + identifier + " check for corresponding loaded messages to determine whether it was loaded or cached"); } catch (ExecutionException e) { @@ -758,7 +769,7 @@ public class ShuffleHandler implements AttemptRegistrationListener { return outputInfo; } - protected void populateHeaders(List<String> mapIds, String jobId, + protected void populateHeaders(List<String> mapIds, String jobId, int dagId, String user, int reduce, HttpResponse response, boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap) throws IOException { @@ -767,7 +778,7 @@ public class ShuffleHandler implements AttemptRegistrationListener { long contentLength = 0; for (String mapId : mapIds) { - MapOutputInfo outputInfo = getMapOutputInfo(jobId, mapId, reduce, user); + MapOutputInfo outputInfo = getMapOutputInfo(jobId, dagId, mapId, reduce, user); // mapOutputInfoMap is used to share the lookups with the caller if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) { mapOutputInfoMap.put(mapId, outputInfo); @@ -952,8 +963,8 @@ public class ShuffleHandler implements AttemptRegistrationListener { private static final String USERCACHE_CONSTANT = "usercache"; private static final String APPCACHE_CONSTANT = "appcache"; - private static String getBaseLocation(String jobIdString, String user) { - // $x/$user/appcache/$appId/output/$mapId + private static String getBaseLocation(String jobIdString, int dagId, String user) { + // $x/$user/appcache/$appId/${dagId}/output/$mapId // TODO: Once Shuffle is out of NM, this can use MR APIs to convert // between App and Job String parts[] = jobIdString.split("_"); @@ -963,7 +974,9 @@ public class ShuffleHandler implements AttemptRegistrationListener { final String baseStr = USERCACHE_CONSTANT + "/" + user + "/" + APPCACHE_CONSTANT + "/" - + ConverterUtils.toString(appID) + "/output" + "/"; + + ConverterUtils.toString(appID) + + "/" + dagId + + "/output" + "/"; return baseStr; } @@ -980,11 +993,13 @@ public class ShuffleHandler implements AttemptRegistrationListener { static class AttemptPathIdentifier { private final String jobId; + private final int dagId; private final String user; private final String attemptId; - public AttemptPathIdentifier(String jobId, String user, String attemptId) { + public AttemptPathIdentifier(String jobId, int dagId, String user, String attemptId) { this.jobId = jobId; + this.dagId = dagId; this.user = user; this.attemptId = attemptId; } @@ -1000,19 +1015,20 @@ public class ShuffleHandler implements AttemptRegistrationListener { AttemptPathIdentifier that = (AttemptPathIdentifier) o; - if (!attemptId.equals(that.attemptId)) { + if (dagId != that.dagId) { return false; } if (!jobId.equals(that.jobId)) { return false; } + return attemptId.equals(that.attemptId); - return true; } @Override public int hashCode() { int result = jobId.hashCode(); + result = 31 * result + dagId; result = 31 * result + attemptId.hashCode(); return result; } @@ -1020,11 +1036,11 @@ public class ShuffleHandler implements AttemptRegistrationListener { @Override public String toString() { return "AttemptPathIdentifier{" + - "attemptId='" + attemptId + '\'' + - ", jobId='" + jobId + '\'' + + "jobId='" + jobId + '\'' + + ", dagId=" + dagId + + ", user='" + user + '\'' + + ", attemptId='" + attemptId + '\'' + '}'; } } - - } http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index d35b04a..99459e4 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -16,7 +16,9 @@ package org.apache.hadoop.hive.llap.tezplugins; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.RejectedExecutionException; @@ -26,12 +28,12 @@ import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.protobuf.ByteString; import com.google.protobuf.ServiceException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; @@ -61,10 +63,12 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { - private static final Log LOG = LogFactory.getLog(LlapTaskCommunicator.class); + private static final Logger LOG = LoggerFactory.getLogger(LlapTaskCommunicator.class); private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST; private final ConcurrentMap<String, ByteBuffer> credentialMap; @@ -73,8 +77,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { // When DAG specific cleanup happens, it'll be better to link this to a DAG though. private final EntityTracker entityTracker = new EntityTracker(); private final SourceStateTracker sourceStateTracker; + private final Set<LlapNodeId> nodesForQuery = new HashSet<>(); private TaskCommunicator communicator; + private long deleteDelayOnDagComplete; private final LlapTaskUmbilicalProtocol umbilical; private volatile String currentDagName; @@ -106,6 +112,11 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { int numThreads = conf.getInt(LlapConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS, LlapConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT); this.communicator = new TaskCommunicator(numThreads); + this.deleteDelayOnDagComplete = conf.getLong(LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS, + LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS_DEFAULT); + LOG.info("Running LlapTaskCommunicator with " + + "fileCleanupDelay=" + deleteDelayOnDagComplete + + ", numCommunicatorThreads=" + numThreads); this.communicator.init(conf); } @@ -131,21 +142,23 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { new JobTokenSecretManager(); jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken); + int numHandlers = conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT, + TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT); server = new RPC.Builder(conf) .setProtocol(LlapTaskUmbilicalProtocol.class) .setBindAddress("0.0.0.0") .setPort(0) .setInstance(umbilical) - .setNumHandlers( - conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT, - TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT)) + .setNumHandlers(numHandlers) .setSecretManager(jobTokenSecretManager).build(); // Do serviceACLs need to be refreshed, like in Tez ? server.start(); this.address = NetUtils.getConnectAddress(server); - LOG.info("Started LlapUmbilical: " + umbilical.getClass().getName() + " at address: " + address); + LOG.info( + "Started LlapUmbilical: " + umbilical.getClass().getName() + " at address: " + address + + " with numHandlers=" + numHandlers); } catch (IOException e) { throw new TezUncheckedException(e); } @@ -192,7 +205,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { ", while trying to launch task: " + taskSpec.getTaskAttemptID()); } + LlapNodeId nodeId = LlapNodeId.getInstance(host, port); entityTracker.registerTaskAttempt(containerId, taskSpec.getTaskAttemptID(), host, port); + nodesForQuery.add(nodeId); sourceStateTracker.registerTaskForStateUpdates(host, port, taskSpec.getInputs()); FragmentRuntimeInfo fragmentRuntimeInfo = sourceStateTracker.getFragmentRuntimeInfo(taskSpec.getDAGName(), @@ -269,6 +284,29 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } @Override + public void dagComplete(final String dagName) { + QueryCompleteRequestProto request = QueryCompleteRequestProto.newBuilder().setDagName( + dagName).setDeleteDelay(deleteDelayOnDagComplete).build(); + for (final LlapNodeId llapNodeId : nodesForQuery) { + LOG.info("Sending dagComplete message for {}, to {}", dagName, llapNodeId); + communicator.sendQueryComplete(request, llapNodeId.getHostname(), llapNodeId.getPort(), + new TaskCommunicator.ExecuteRequestCallback<LlapDaemonProtocolProtos.QueryCompleteResponseProto>() { + @Override + public void setResponse(LlapDaemonProtocolProtos.QueryCompleteResponseProto response) { + } + + @Override + public void indicateError(Throwable t) { + LOG.warn("Failed to indicate dag complete dagId={} to node {}", dagName, llapNodeId); + } + }); + } + + nodesForQuery.clear(); + // TODO Ideally move some of the other cleanup code from resetCurrentDag over here + } + + @Override public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) { // Delegate updates over to the source state tracker. sourceStateTracker @@ -301,9 +339,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { // Working on the assumption that a single DAG runs at a time per AM. currentDagName = newDagName; sourceStateTracker.resetState(newDagName); + nodesForQuery.clear(); LOG.info("CurrentDag set to: " + newDagName); - // TODO Additional state reset. Potentially sending messages to node to reset. - // Is it possible for heartbeats to come in from lost tasks - those should be told to die, which + // TODO Is it possible for heartbeats to come in from lost tasks - those should be told to die, which // is likely already happening. } http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java index 3b4612d..d536eb2 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java @@ -31,6 +31,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemonProtocolClientImpl; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; @@ -74,7 +76,8 @@ public class TaskCommunicator extends AbstractService { } - public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto request, final String host, final int port, + public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto request, final String host, + final int port, final ExecuteRequestCallback<SourceStateUpdatedResponseProto> callback) { ListenableFuture<SourceStateUpdatedResponseProto> future = executor.submit(new SendSourceStateUpdateCallable(host, port, request)); @@ -91,7 +94,26 @@ public class TaskCommunicator extends AbstractService { }); } - private static abstract class CallableRequest<REQUEST extends Message, RESPONSE extends Message> implements Callable { + public void sendQueryComplete(final QueryCompleteRequestProto request, final String host, + final int port, + final ExecuteRequestCallback<QueryCompleteResponseProto> callback) { + ListenableFuture<QueryCompleteResponseProto> future = + executor.submit(new SendQueryCompleteCallable(host, port, request)); + Futures.addCallback(future, new FutureCallback<QueryCompleteResponseProto>() { + @Override + public void onSuccess(QueryCompleteResponseProto result) { + callback.setResponse(result); + } + + @Override + public void onFailure(Throwable t) { + callback.indicateError(t); + } + }); + } + + private static abstract class CallableRequest<REQUEST extends Message, RESPONSE extends Message> + implements Callable { final String hostname; final int port; @@ -134,6 +156,20 @@ public class TaskCommunicator extends AbstractService { } } + private class SendQueryCompleteCallable + extends CallableRequest<QueryCompleteRequestProto, QueryCompleteResponseProto> { + + protected SendQueryCompleteCallable(String hostname, int port, + QueryCompleteRequestProto queryCompleteRequestProto) { + super(hostname, port, queryCompleteRequestProto); + } + + @Override + public QueryCompleteResponseProto call() throws Exception { + return getProxy(hostname, port).queryComplete(null, request); + } + } + public interface ExecuteRequestCallback<T extends Message> { void setResponse(T response); void indicateError(Throwable t); http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/protobuf/LlapDaemonProtocol.proto ---------------------------------------------------------------------- diff --git a/llap-server/src/protobuf/LlapDaemonProtocol.proto b/llap-server/src/protobuf/LlapDaemonProtocol.proto index f7e6d2b..e098e87 100644 --- a/llap-server/src/protobuf/LlapDaemonProtocol.proto +++ b/llap-server/src/protobuf/LlapDaemonProtocol.proto @@ -98,7 +98,30 @@ message SourceStateUpdatedRequestProto { message SourceStateUpdatedResponseProto { } +message QueryCompleteRequestProto { + optional string query_id = 1; + optional string dag_name = 2; + optional int64 delete_delay = 3 [default = 0]; +} + +message QueryCompleteResponseProto { +} + +message TerminateFragmentRequestProto { + optional string query_id = 1; + optional string dag_name = 2; + optional int32 dag_attempt_number = 3; + optional string vertex_name = 4; + optional int32 fragment_number = 5; + optional int32 attempt_number = 6; +} + +message TerminateFragmentResponseProto { +} + service LlapDaemonProtocol { rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto); rpc sourceStateUpdated(SourceStateUpdatedRequestProto) returns (SourceStateUpdatedResponseProto); + rpc queryComplete(QueryCompleteRequestProto) returns (QueryCompleteResponseProto); + rpc terminateFragment(TerminateFragmentRequestProto) returns (TerminateFragmentResponseProto); }