Repository: hive Updated Branches: refs/heads/master 01cef9230 -> 10f4eadd5
http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-common/src/protobuf/LlapDaemonProtocol.proto ---------------------------------------------------------------------- diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto b/llap-common/src/protobuf/LlapDaemonProtocol.proto index d70dd41..3aeacb2 100644 --- a/llap-common/src/protobuf/LlapDaemonProtocol.proto +++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto @@ -134,6 +134,14 @@ message SubmitWorkRequestProto { optional bool is_guaranteed = 12 [default = false]; } +message RegisterDagRequestProto { + optional string user = 1; + required QueryIdentifierProto query_identifier = 2; + optional bytes credentials_binary = 3; +} + +message RegisterDagResponseProto { +} enum SubmissionStateProto { ACCEPTED = 1; @@ -204,6 +212,7 @@ message PurgeCacheResponseProto { } service LlapDaemonProtocol { + rpc registerDag(RegisterDagRequestProto) returns (RegisterDagResponseProto); rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto); rpc sourceStateUpdated(SourceStateUpdatedRequestProto) returns (SourceStateUpdatedResponseProto); rpc queryComplete(QueryCompleteRequestProto) returns (QueryCompleteResponseProto); http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java index 035960e..582f518 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java @@ -16,6 +16,7 @@ package org.apache.hadoop.hive.llap.daemon; import java.io.IOException; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; @@ -29,6 +30,10 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UpdateFra public interface ContainerRunner { + LlapDaemonProtocolProtos.RegisterDagResponseProto registerDag( + LlapDaemonProtocolProtos.RegisterDagRequestProto request) + throws IOException; + SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException; SourceStateUpdatedResponseProto sourceStateUpdated( http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index ef5922e..7a3ca2f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -27,10 +27,9 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.UgiFactory; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.DaemonId; import org.apache.hadoop.hive.llap.LlapNodeId; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.NotTezEventHelper; import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; import org.apache.hadoop.hive.llap.counters.LlapWmCounters; @@ -55,6 +54,8 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceSta import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RegisterDagRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RegisterDagResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UpdateFragmentRequestProto; @@ -65,7 +66,6 @@ import org.apache.hadoop.hive.llap.security.LlapSignerImpl; import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; -import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -119,11 +119,11 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu private final UgiFactory fsUgiFactory; private final SocketFactory socketFactory; - public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSize, - boolean enablePreemption, String[] localDirsBase, AtomicReference<Integer> localShufflePort, + public ContainerRunnerImpl(Configuration conf, int numExecutors, AtomicReference<Integer> localShufflePort, AtomicReference<InetSocketAddress> localAddress, long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics, - AMReporter amReporter, ClassLoader classLoader, DaemonId daemonId, UgiFactory fsUgiFactory, + AMReporter amReporter, QueryTracker queryTracker, Scheduler<TaskRunnerCallable> executorService, + DaemonId daemonId, UgiFactory fsUgiFactory, SocketFactory socketFactory) { super("ContainerRunnerImpl"); Preconditions.checkState(numExecutors > 0, @@ -138,15 +138,10 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu this.clusterId = daemonId.getClusterString(); this.daemonId = daemonId; - this.queryTracker = new QueryTracker(conf, localDirsBase, clusterId); - addIfService(queryTracker); - String waitQueueSchedulerClassName = HiveConf.getVar( - conf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME); - this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, - waitQueueSchedulerClassName, enablePreemption, classLoader, metrics, null); + this.queryTracker = queryTracker; + this.executorService = executorService; completionListener = (SchedulerFragmentCompletingListener) executorService; - addIfService(executorService); // Distribute the available memory between the tasks. this.memoryPerExecutor = (long)(totalMemoryAvailableBytes / (float) numExecutors); @@ -187,6 +182,26 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu } @Override + public RegisterDagResponseProto registerDag(RegisterDagRequestProto request) + throws IOException { + QueryIdentifierProto identifier = request.getQueryIdentifier(); + Credentials credentials; + if (request.hasCredentialsBinary()) { + credentials = LlapUtil.credentialsFromByteArray( + request.getCredentialsBinary().toByteArray()); + } else { + credentials = new Credentials(); + } + queryTracker.registerDag(identifier.getApplicationIdString(), + identifier.getDagIndex(), request.getUser(), credentials); + if (LOG.isInfoEnabled()) { + LOG.info("Application with id={}, dagId={} registered", + identifier.getApplicationIdString(), identifier.getDagIndex()); + } + return RegisterDagResponseProto.newBuilder().build(); + } + + @Override public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException { LlapTokenInfo tokenInfo = null; try { @@ -242,11 +257,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu QueryIdentifier queryIdentifier = new QueryIdentifier( qIdProto.getApplicationIdString(), dagIdentifier); - Credentials credentials = new Credentials(); - DataInputBuffer dib = new DataInputBuffer(); - byte[] tokenBytes = request.getCredentialsBinary().toByteArray(); - dib.reset(tokenBytes, tokenBytes.length); - credentials.readTokenStorageStream(dib); + Credentials credentials = LlapUtil.credentialsFromByteArray( + request.getCredentialsBinary().toByteArray()); Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials); http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 52990c5..940be0e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; @@ -291,9 +292,22 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla } catch (IOException e) { throw new RuntimeException(e); } - this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, waitQueueSize, - enablePreemption, localDirs, this.shufflePort, srvAddress, executorMemoryPerInstance, metrics, - amReporter, executorClassLoader, daemonId, fsUgiFactory, socketFactory); + + QueryTracker queryTracker = new QueryTracker(daemonConf, localDirs, + daemonId.getClusterString()); + + String waitQueueSchedulerClassName = HiveConf.getVar( + daemonConf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME); + + Scheduler<TaskRunnerCallable> executorService = new TaskExecutorService(numExecutors, waitQueueSize, + waitQueueSchedulerClassName, enablePreemption, executorClassLoader, metrics, null); + + addIfService(queryTracker); + addIfService(executorService); + + this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, + this.shufflePort, srvAddress, executorMemoryPerInstance, metrics, + amReporter, queryTracker, executorService, daemonId, fsUgiFactory, socketFactory); addIfService(containerRunner); // Not adding the registry as a service, since we need to control when it is initialized - conf used to pickup properties. @@ -552,6 +566,13 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla } @Override + public LlapDaemonProtocolProtos.RegisterDagResponseProto registerDag( + LlapDaemonProtocolProtos.RegisterDagRequestProto request) + throws IOException { + return containerRunner.registerDag(request); + } + + @Override public SubmitWorkResponseProto submitWork( SubmitWorkRequestProto request) throws IOException { numSubmissions.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java index d856b25..c3a95c8 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java @@ -98,6 +98,18 @@ public class LlapProtocolServerImpl extends AbstractService } @Override + public LlapDaemonProtocolProtos.RegisterDagResponseProto registerDag( + RpcController controller, + LlapDaemonProtocolProtos.RegisterDagRequestProto request) + throws ServiceException { + try { + return containerRunner.registerDag(request); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override public SubmitWorkResponseProto submitWork(RpcController controller, SubmitWorkRequestProto request) throws ServiceException { try { http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index ab84dcc..932d0ad 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.log.Log4jQueryCompleteMarker; import org.apache.hadoop.hive.llap.log.LogHelpers; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; @@ -41,6 +42,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceSta import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.TokenCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; @@ -202,6 +204,21 @@ public class QueryTracker extends AbstractService { } } + public void registerDag(String applicationId, int dagId, String user, + Credentials credentials) { + Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials); + + QueryIdentifier queryIdentifier = new QueryIdentifier(applicationId, dagId); + ReadWriteLock dagLock = getDagLock(queryIdentifier); + dagLock.readLock().lock(); + try { + ShuffleHandler.get() + .registerDag(applicationId, dagId, jobToken, user, null); + } finally { + dagLock.readLock().unlock(); + } + } + /** * Indicate to the tracker that a fragment is complete. This is from internal execution within the daemon * @param fragmentInfo http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java index 18a37a2..aff2c2e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java @@ -49,6 +49,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; @@ -169,6 +170,7 @@ public class ShuffleHandler implements AttemptRegistrationListener { /* List of registered applications */ private final ConcurrentMap<String, Integer> registeredApps = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, Integer> registeredDirectories = new ConcurrentHashMap<>(); /* Maps application identifiers (jobIds) to the associated user for the app */ private final ConcurrentMap<String,String> userRsrc; private JobTokenSecretManager secretManager; @@ -433,18 +435,21 @@ public class ShuffleHandler implements AttemptRegistrationListener { * Register an application and it's associated credentials and user information. * * This method and unregisterDag must be synchronized externally to prevent races in shuffle token registration/unregistration + * This method may be called several times but we can only set the registeredDirectories once which will be + * in the first call in which they are not null. * * @param applicationIdString * @param dagIdentifier * @param appToken * @param user + * @param appDirs */ public void registerDag(String applicationIdString, int dagIdentifier, Token<JobTokenIdentifier> appToken, String user, String[] appDirs) { Integer registeredDagIdentifier = registeredApps.putIfAbsent(applicationIdString, dagIdentifier); // App never seen, or previous dag has been unregistered. - if (registeredDagIdentifier == null) { + if (registeredDagIdentifier == null && appToken != null ) { recordJobShuffleInfo(applicationIdString, user, appToken); } // Register the new dag identifier, if that's not the one currently registered. @@ -454,6 +459,14 @@ public class ShuffleHandler implements AttemptRegistrationListener { // Don't need to recordShuffleInfo since the out of sync unregister will not remove the // credentials } + + if (appDirs == null) { + return; + } + registeredDagIdentifier = registeredDirectories.put(applicationIdString, dagIdentifier); + if (registeredDagIdentifier != null && !registeredDagIdentifier.equals(dagIdentifier)) { + registeredDirectories.put(applicationIdString, dagIdentifier); + } // First time registration, or new register comes in before the previous unregister. if (registeredDagIdentifier == null || !registeredDagIdentifier.equals(dagIdentifier)) { if (dirWatcher != null) { @@ -487,6 +500,7 @@ public class ShuffleHandler implements AttemptRegistrationListener { // be synchronized, hence the following check is sufficient. if (currentDagIdentifier != null && currentDagIdentifier.equals(dagIdentifier)) { registeredApps.remove(applicationIdString); + registeredDirectories.remove(applicationIdString); removeJobShuffleInfo(applicationIdString); } // Unregister for the dirWatcher for the specific dagIdentifier in either case. @@ -514,6 +528,16 @@ public class ShuffleHandler implements AttemptRegistrationListener { } } + @VisibleForTesting + public Map getRegisteredApps() { + return new HashMap<>(registeredApps); + } + + @VisibleForTesting + public Map getRegisteredDirectories() { + return new HashMap<>(registeredDirectories); + } + protected Shuffle getShuffle(Configuration conf) { return shuffle; } http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java new file mode 100644 index 0000000..21f732b --- /dev/null +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon; + +import com.google.protobuf.ByteString; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; +import org.apache.hadoop.security.Credentials; +import java.io.IOException; + +/** + * Utils class for testing Llap Daemon. + */ +public class LlapDaemonTestUtils { + private LlapDaemonTestUtils() {} + + public static SubmitWorkRequestProto buildSubmitProtoRequest(int fragmentNumber, + String appId, int dagId, int vId, String dagName, + int dagStartTime, int attemptStartTime, int numSelfAndUpstreamTasks, int numSelfAndUpstreamComplete, + int withinDagPriority, Credentials credentials) throws IOException { + return SubmitWorkRequestProto + .newBuilder() + .setAttemptNumber(0) + .setFragmentNumber(fragmentNumber) + .setWorkSpec( + LlapDaemonProtocolProtos.VertexOrBinary.newBuilder().setVertex( + LlapDaemonProtocolProtos.SignableVertexSpec + .newBuilder() + .setQueryIdentifier( + LlapDaemonProtocolProtos.QueryIdentifierProto.newBuilder() + .setApplicationIdString(appId) + .setAppAttemptNumber(0) + .setDagIndex(dagId) + .build()) + .setVertexIndex(vId) + .setDagName(dagName) + .setHiveQueryId(dagName) + .setVertexName("MockVertex") + .setUser("MockUser") + .setTokenIdentifier("MockToken_1") + .setProcessorDescriptor( + LlapDaemonProtocolProtos.EntityDescriptorProto.newBuilder() + .setClassName("MockProcessor").build()) + .build()).build()) + .setAmHost("localhost") + .setAmPort(12345) + .setCredentialsBinary(ByteString.copyFrom(LlapTezUtils.serializeCredentials(credentials))) + .setContainerIdString("MockContainer_1") + .setFragmentRuntimeInfo(LlapDaemonProtocolProtos + .FragmentRuntimeInfo + .newBuilder() + .setDagStartTime(dagStartTime) + .setFirstAttemptStartTime(attemptStartTime) + .setNumSelfAndUpstreamTasks(numSelfAndUpstreamTasks) + .setNumSelfAndUpstreamCompletedTasks(numSelfAndUpstreamComplete) + .setWithinDagPriority(withinDagPriority) + .build()) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java new file mode 100644 index 0000000..ea4f50b --- /dev/null +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java @@ -0,0 +1,180 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.impl; + +import com.google.common.primitives.Ints; +import com.google.protobuf.ByteString; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.DaemonId; +import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; +import org.apache.hadoop.hive.llap.daemon.LlapDaemonTestUtils; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RegisterDagRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; +import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; +import org.apache.hadoop.hive.llap.metrics.MetricsUtils; +import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; +import org.apache.hadoop.hive.llap.security.LlapUgiFactoryFactory; +import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; +import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.tez.common.security.TokenCache; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.net.SocketFactory; +import java.io.File; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.Mockito.mock; + +/** + * Test ContainerRunnerImpl. + */ +public class TestContainerRunnerImpl { + ContainerRunnerImpl containerRunner; + LlapDaemonConfiguration daemonConf = new LlapDaemonConfiguration(); + private final int numExecutors = 1; + private final int waitQueueSize = HiveConf.getIntVar( + daemonConf, HiveConf.ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE); + private final boolean enablePreemption = false; + private final int numLocalDirs = 1; + private final String[] localDirs = new String[numLocalDirs]; + private final File testWorkDir = new File("target", "container-runner-tests"); + private final AtomicReference<Integer> shufflePort = new AtomicReference<>(); + private final AtomicReference<InetSocketAddress> srvAddress = new AtomicReference<>(); + private final int executorMemoryPerInstance = 1024; + private LlapDaemonExecutorMetrics metrics; + private AMReporter amReporter; + private final String testUser = "testUser"; + private final String appId = "application_1540489363818_0021"; + private final int dagId = 1234; + private final int vId = 12345; + private final String hostname = "test.cluster"; + private final DaemonId daemonId = new DaemonId(testUser, + "ContainerTests", hostname, + appId, System.currentTimeMillis()); + private final SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(daemonConf); + private QueryTracker queryTracker; + private TaskExecutorService executorService; + private InetSocketAddress serverSocket; + + + @Before + public void setup() throws Exception { + + String[] strIntervals = HiveConf.getTrimmedStringsVar(daemonConf, + HiveConf.ConfVars.LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS); + List<Integer> intervalList = new ArrayList<>(); + if (strIntervals != null) { + for (String strInterval : strIntervals) { + intervalList.add(Integer.valueOf(strInterval)); + } + } + + amReporter = mock(AMReporter.class); + serverSocket = new InetSocketAddress("localhost", 0); + srvAddress.set(serverSocket); + + this.metrics = LlapDaemonExecutorMetrics + .create("ContinerRunerTests", MetricsUtils.getUUID(), numExecutors, + Ints.toArray(intervalList)); + + for (int i = 0; i < numLocalDirs; i++) { + File f = new File(testWorkDir, "localDir"); + f.mkdirs(); + localDirs[i] = f.getAbsolutePath(); + } + String waitQueueSchedulerClassName = HiveConf.getVar( + daemonConf, HiveConf.ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME); + + queryTracker = new QueryTracker(daemonConf, localDirs, daemonId.getClusterString()); + executorService = new TaskExecutorService(numExecutors, waitQueueSize, + waitQueueSchedulerClassName, enablePreemption, Thread.currentThread().getContextClassLoader(), metrics, null); + + shufflePort.set(HiveConf.getIntVar( + daemonConf, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT)); + containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, + this.shufflePort, srvAddress, executorMemoryPerInstance, metrics, + amReporter, queryTracker, executorService, daemonId, LlapUgiFactoryFactory + .createFsUgiFactory(daemonConf), socketFactory); + + ShuffleHandler.initializeAndStart(daemonConf); + + executorService.init(daemonConf); + executorService.start(); + queryTracker.init(daemonConf); + queryTracker.start(); + containerRunner.init(daemonConf); + containerRunner.start(); + } + + @After + public void cleanup() throws Exception { + containerRunner.serviceStop(); + queryTracker.serviceStop(); + executorService.serviceStop(); + executorService.serviceStop(); + LlapMetricsSystem.shutdown(); + } + + @Test(timeout = 10000) + public void testRegisterDag() throws Exception { + Credentials credentials = new Credentials(); + Token<LlapTokenIdentifier> sessionToken = new Token<>( + "identifier".getBytes(), "testPassword".getBytes(), new Text("kind"), new Text("service")); + TokenCache.setSessionToken(sessionToken, credentials); + + RegisterDagRequestProto request = RegisterDagRequestProto.newBuilder() + .setUser(testUser) + .setCredentialsBinary(ByteString.copyFrom(LlapTezUtils.serializeCredentials(credentials))) + .setQueryIdentifier( + QueryIdentifierProto.newBuilder() + .setApplicationIdString(appId) + .setDagIndex(dagId) + .build()) + .build(); + containerRunner.registerDag(request); + Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1); + Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), dagId); + Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 0); + + containerRunner.registerDag(request); + Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1); + Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), dagId); + Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 0); + + SubmitWorkRequestProto sRequest = + LlapDaemonTestUtils.buildSubmitProtoRequest(1, appId, + dagId, vId, "dagName", 0, 0, + 0, 0, 1, + credentials); + + containerRunner.submitWork(sRequest); + Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1); + Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), dagId); + Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 1); + Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().get(appId), dagId); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java index d3aa539..dffbb57 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java @@ -22,30 +22,29 @@ import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.cr import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import org.apache.hadoop.hive.llap.daemon.LlapDaemonTestUtils; import org.apache.hadoop.hive.llap.daemon.impl.EvictingPriorityBlockingQueue; import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezVertexID; import org.junit.Test; +import java.io.IOException; + public class TestFirstInFirstOutComparator { private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, int dagStartTime, - int attemptStartTime) { + int attemptStartTime) throws IOException { // Same priority for all tasks. return createRequest(fragmentNumber, numSelfAndUpstreamTasks, 0, dagStartTime, attemptStartTime, 1); } private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, int numSelfAndUpstreamComplete, int dagStartTime, - int attemptStartTime, int withinDagPriority) { + int attemptStartTime, int withinDagPriority) throws IOException { return createRequest(fragmentNumber, numSelfAndUpstreamTasks, numSelfAndUpstreamComplete, dagStartTime, attemptStartTime, withinDagPriority, "MockDag"); } @@ -54,50 +53,18 @@ public class TestFirstInFirstOutComparator { private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, int numSelfAndUpstreamComplete, int dagStartTime, int attemptStartTime, int withinDagPriority, - String dagName) { + String dagName) throws IOException { ApplicationId appId = ApplicationId.newInstance(9999, 72); TezDAGID dagId = TezDAGID.getInstance(appId, 1); TezVertexID vId = TezVertexID.getInstance(dagId, 35); - return SubmitWorkRequestProto - .newBuilder() - .setAttemptNumber(0) - .setFragmentNumber(fragmentNumber) - .setWorkSpec( - VertexOrBinary.newBuilder().setVertex( - SignableVertexSpec - .newBuilder() - .setQueryIdentifier( - QueryIdentifierProto.newBuilder() - .setApplicationIdString(appId.toString()) - .setAppAttemptNumber(0) - .setDagIndex(dagId.getId()) - .build()) - .setVertexIndex(vId.getId()) - .setDagName(dagName) - .setHiveQueryId(dagName) - .setVertexName("MockVertex") - .setUser("MockUser") - .setTokenIdentifier("MockToken_1") - .setProcessorDescriptor( - EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build()) - .build()).build()) - .setAmHost("localhost") - .setAmPort(12345) - .setContainerIdString("MockContainer_1") - .setFragmentRuntimeInfo(LlapDaemonProtocolProtos - .FragmentRuntimeInfo - .newBuilder() - .setDagStartTime(dagStartTime) - .setFirstAttemptStartTime(attemptStartTime) - .setNumSelfAndUpstreamTasks(numSelfAndUpstreamTasks) - .setNumSelfAndUpstreamCompletedTasks(numSelfAndUpstreamComplete) - .setWithinDagPriority(withinDagPriority) - .build()) - .build(); + return LlapDaemonTestUtils.buildSubmitProtoRequest(fragmentNumber, appId.toString(), + dagId.getId(), vId.getId(), dagName, dagStartTime, attemptStartTime, + numSelfAndUpstreamTasks, numSelfAndUpstreamComplete, withinDagPriority, + new Credentials()); } @Test (timeout = 60000) - public void testWaitQueueComparator() throws InterruptedException { + public void testWaitQueueComparator() throws InterruptedException, IOException { TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000); TaskWrapper r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000); TaskWrapper r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 1000000); @@ -273,7 +240,7 @@ public class TestFirstInFirstOutComparator { } @Test(timeout = 60000) - public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException { + public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException, IOException { TaskWrapper r1 = createTaskWrapper(createRequest(1, 1, 0, 100, 100, 10), false, 100000); TaskWrapper r2 = createTaskWrapper(createRequest(2, 1, 0, 100, 100, 1), false, 100000); TaskWrapper r3 = createTaskWrapper(createRequest(3, 1, 0, 100, 100, 5), false, 100000); @@ -291,7 +258,7 @@ public class TestFirstInFirstOutComparator { } @Test(timeout = 60000) - public void testWaitQueueComparatorWithinSameDagPriority() throws InterruptedException { + public void testWaitQueueComparatorWithinSameDagPriority() throws InterruptedException, IOException { TaskWrapper r1 = createTaskWrapper(createRequest(1, 1, 0, 10, 100, 10), true, 100000); TaskWrapper r2 = createTaskWrapper(createRequest(2, 1, 0, 10, 100, 10), true, 100000); TaskWrapper r3 = createTaskWrapper(createRequest(3, 1, 0, 10, 100, 10), true, 100000); @@ -309,7 +276,7 @@ public class TestFirstInFirstOutComparator { } @Test(timeout = 60000) - public void testWaitQueueComparatorParallelism() throws InterruptedException { + public void testWaitQueueComparatorParallelism() throws InterruptedException, IOException { TaskWrapper r1 = createTaskWrapper(createRequest(1, 10, 3, 100, 100, 1, "q1"), false, 100000); TaskWrapper r2 = createTaskWrapper(createRequest(2, 10, 7, 100, 100, 1, "q2"), false, 100000); TaskWrapper r3 = createTaskWrapper(createRequest(3, 10, 5, 100, 100, 1, "q3"), false, 100000); http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 5d4ce22..0120bb6 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -52,6 +52,8 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RegisterDagRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RegisterDagResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; @@ -68,7 +70,6 @@ import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy; import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker; import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; @@ -306,6 +307,48 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { void setError(CtxType ctx, Throwable t); } + /** + * @param node + * @param callback + * @return if it was possible to attemp the registration. Sometimes it's + * not possible because is a dag is not running + */ + public boolean registerDag(NodeInfo node, final OperationCallback<QueryIdentifierProto, Void> callback) { + RegisterDagRequestProto.Builder builder = RegisterDagRequestProto.newBuilder(); + if (currentQueryIdentifierProto == null) { + return false; + } + try { + RegisterDagRequestProto request = builder + .setQueryIdentifier(currentQueryIdentifierProto) + .setUser(user) + .setCredentialsBinary( + getCredentials(getContext() + .getCurrentDagInfo().getCredentials())).build(); + communicator.registerDag(request, node.getHost(), node.getRpcPort(), + new LlapProtocolClientProxy.ExecuteRequestCallback<RegisterDagResponseProto>() { + @Override + public void setResponse(RegisterDagResponseProto response) { + callback.setDone(null, currentQueryIdentifierProto); + } + + @Override + public void indicateError(Throwable t) { + LOG.info("Error registering dag with" + + " appId=" + currentQueryIdentifierProto.getApplicationIdString() + + " dagId=" + currentQueryIdentifierProto.getDagIndex() + + " to node " + node.getHost()); + if (!processSendError(t)) { + callback.setError(null, t); + } + } + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + return true; + } + public <T> void startUpdateGuaranteed(TezTaskAttemptID attemptId, NodeInfo assignedNode, boolean newState, final OperationCallback<Boolean, T> callback, final T ctx) { LlapNodeId nodeId = entityTracker.getNodeIdForTaskAttempt(attemptId); @@ -795,14 +838,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { Preconditions.checkState(currentQueryIdentifierProto.getDagIndex() == taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId()); - ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto); - if (credentialsBinary == null) { - credentialsBinary = serializeCredentials(getContext().getCurrentDagInfo().getCredentials()); - credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate()); - } else { - credentialsBinary = credentialsBinary.duplicate(); - } - builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary)); + + + builder.setCredentialsBinary( + getCredentials(getContext().getCurrentDagInfo().getCredentials())); builder.setWorkSpec(VertexOrBinary.newBuilder().setVertex(Converters.constructSignableVertexSpec( taskSpec, currentQueryIdentifierProto, getTokenIdentifier(), user, hiveQueryId)).build()); // Don't call builder.setWorkSpecSignature() - Tez doesn't sign fragments @@ -814,16 +853,17 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { return builder.build(); } - private ByteBuffer serializeCredentials(Credentials credentials) throws IOException { - Credentials containerCredentials = new Credentials(); - containerCredentials.addAll(credentials); - DataOutputBuffer containerTokens_dob = new DataOutputBuffer(); - containerCredentials.writeTokenStorageToStream(containerTokens_dob); - return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength()); + private ByteString getCredentials(Credentials credentials) throws IOException { + ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto); + if (credentialsBinary == null) { + credentialsBinary = LlapTezUtils.serializeCredentials(credentials); + credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate()); + } else { + credentialsBinary = credentialsBinary.duplicate(); + } + return ByteString.copyFrom(credentialsBinary); } - - protected class LlapTaskUmbilicalProtocolImpl implements LlapTaskUmbilicalProtocol { private final TezTaskUmbilicalProtocol tezUmbilical; http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 7e8299d..79bca60 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -16,7 +16,6 @@ package org.apache.hadoop.hive.llap.tezplugins; import com.google.common.io.ByteArrayDataOutput; -import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.io.Text; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -71,6 +70,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; import org.apache.hadoop.hive.llap.metrics.MetricsUtils; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; @@ -81,12 +81,8 @@ import org.apache.hadoop.hive.llap.tezplugins.endpoint.LlapPluginServerImpl; import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock; import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerMetrics; import org.apache.hadoop.hive.llap.tezplugins.scheduler.LoggingFutureCallback; -import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; -import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl; -import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -121,7 +117,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.common.io.ByteArrayDataOutput; import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -133,6 +128,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { private static final Logger LOG = LoggerFactory.getLogger(LlapTaskSchedulerService.class); private static final Logger WM_LOG = LoggerFactory.getLogger("GuaranteedTasks"); private static final TaskStartComparator TASK_INFO_COMPARATOR = new TaskStartComparator(); + private final static Comparator<Priority> PRIORITY_COMPARATOR = new Comparator<Priority>() { @Override public int compare(Priority o1, Priority o2) { @@ -154,6 +150,31 @@ public class LlapTaskSchedulerService extends TaskScheduler { } } + private final class RegisterDagCallback implements OperationCallback<QueryIdentifierProto, Void> { + private final LlapServiceInstance llapServiceInstance; + private final NodeInfo nodeInfo; + RegisterDagCallback(NodeInfo nodeInfo, LlapServiceInstance llapServiceInstance) { + this.nodeInfo = nodeInfo; + this.llapServiceInstance = llapServiceInstance; + } + @Override + public void setDone(Void v, QueryIdentifierProto result) { + LOG.info("Dag with" + + " appId=" + result.getApplicationIdString() + + " dagId=" + result.getDagIndex() + + " registered successfully for node " + nodeInfo.getHost()); + addNode(nodeInfo, llapServiceInstance); + } + + @Override + public void setError(Void v, Throwable t) { + LOG.warn("Error registering dag for node " + nodeInfo.getHost(), t); + // In case we fail to register the dag we add the node anyway + // We will try to register the dag when we schedule the first container + addNode(nodeInfo, llapServiceInstance); + } + } + // TODO: this is an ugly hack; see the same in LlapTaskCommunicator for discussion. // This only lives for the duration of the service init. static LlapTaskSchedulerService instance = null; @@ -761,7 +782,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { registry.registerStateChangeListener(new NodeStateChangeListener()); activeInstances = registry.getInstances(); for (LlapServiceInstance inst : activeInstances.getAll()) { - addNode(new NodeInfo(inst, nodeBlacklistConf, clock, + registerAndAddNode(new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode, metrics), inst); } if (amRegistry != null) { @@ -788,7 +809,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { public void onCreate(LlapServiceInstance serviceInstance, int ephSeqVersion) { LOG.info("Added node with identity: {} as a result of registry callback", serviceInstance.getWorkerIdentity()); - addNode(new NodeInfo(serviceInstance, nodeBlacklistConf, clock, + registerAndAddNode(new NodeInfo(serviceInstance, nodeBlacklistConf, clock, numSchedulableTasksPerNode, metrics), serviceInstance); } @@ -1510,6 +1531,18 @@ public class LlapTaskSchedulerService extends TaskScheduler { return new SelectHostResult(randomNode); } + private void registerAndAddNode(NodeInfo node, LlapServiceInstance serviceInstance) { + if (communicator != null) { + boolean registered = communicator + .registerDag(node, new RegisterDagCallback(node, serviceInstance)); + if (!registered) { + addNode(node, serviceInstance); + } + } else { + addNode(node, serviceInstance); + } + } + private void addNode(NodeInfo node, LlapServiceInstance serviceInstance) { // we have just added a new node. Signal timeout monitor to reset timer if (activeInstances.size() != 0 && timeoutFutureRef.get() != null) { http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java index e4af660..f1bc28d 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java @@ -15,13 +15,18 @@ package org.apache.hadoop.hive.llap.tezplugins; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.security.Credentials; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.mapreduce.hadoop.MRInputHelpers; import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.input.MultiMRInput; +import java.io.IOException; +import java.nio.ByteBuffer; + @InterfaceAudience.Private public class LlapTezUtils { public static boolean isSourceOfInterest(String inputClassName) { @@ -48,4 +53,13 @@ public class LlapTezUtils { } return s; } + + public static ByteBuffer serializeCredentials(Credentials credentials) throws + IOException { + Credentials containerCredentials = new Credentials(); + containerCredentials.addAll(credentials); + DataOutputBuffer containerTokensDob = new DataOutputBuffer(); + containerCredentials.writeTokenStorageToStream(containerTokensDob); + return ByteBuffer.wrap(containerTokensDob.getData(), 0, containerTokensDob.getLength()); + } }