Repository: hive
Updated Branches:
  refs/heads/master 01cef9230 -> 10f4eadd5


http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-common/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto 
b/llap-common/src/protobuf/LlapDaemonProtocol.proto
index d70dd41..3aeacb2 100644
--- a/llap-common/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto
@@ -134,6 +134,14 @@ message SubmitWorkRequestProto {
   optional bool is_guaranteed = 12 [default = false];
 }
 
+message RegisterDagRequestProto {
+  optional string user = 1;
+  required QueryIdentifierProto query_identifier = 2;
+  optional bytes credentials_binary = 3;
+}
+
+message RegisterDagResponseProto {
+}
 
 enum SubmissionStateProto {
   ACCEPTED = 1;
@@ -204,6 +212,7 @@ message PurgeCacheResponseProto {
 }
 
 service LlapDaemonProtocol {
+  rpc registerDag(RegisterDagRequestProto) returns (RegisterDagResponseProto);
   rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto);
   rpc sourceStateUpdated(SourceStateUpdatedRequestProto) returns 
(SourceStateUpdatedResponseProto);
   rpc queryComplete(QueryCompleteRequestProto) returns 
(QueryCompleteResponseProto);

http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
index 035960e..582f518 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hive.llap.daemon;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
@@ -29,6 +30,10 @@ import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UpdateFra
 
 public interface ContainerRunner {
 
+  LlapDaemonProtocolProtos.RegisterDagResponseProto registerDag(
+      LlapDaemonProtocolProtos.RegisterDagRequestProto request)
+      throws IOException;
+
   SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws 
IOException;
 
   SourceStateUpdatedResponseProto sourceStateUpdated(

http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index ef5922e..7a3ca2f 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -27,10 +27,9 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.UgiFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.DaemonId;
 import org.apache.hadoop.hive.llap.LlapNodeId;
+import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.NotTezEventHelper;
 import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
 import org.apache.hadoop.hive.llap.counters.LlapWmCounters;
@@ -55,6 +54,8 @@ import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceSta
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RegisterDagRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RegisterDagResponseProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UpdateFragmentRequestProto;
@@ -65,7 +66,6 @@ import org.apache.hadoop.hive.llap.security.LlapSignerImpl;
 import org.apache.hadoop.hive.llap.tez.Converters;
 import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
 import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
-import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -119,11 +119,11 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
   private final UgiFactory fsUgiFactory;
   private final SocketFactory socketFactory;
 
-  public ContainerRunnerImpl(Configuration conf, int numExecutors, int 
waitQueueSize,
-      boolean enablePreemption, String[] localDirsBase, 
AtomicReference<Integer> localShufflePort,
+  public ContainerRunnerImpl(Configuration conf, int numExecutors, 
AtomicReference<Integer> localShufflePort,
       AtomicReference<InetSocketAddress> localAddress,
       long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics,
-      AMReporter amReporter, ClassLoader classLoader, DaemonId daemonId, 
UgiFactory fsUgiFactory,
+      AMReporter amReporter, QueryTracker queryTracker, 
Scheduler<TaskRunnerCallable> executorService,
+      DaemonId daemonId, UgiFactory fsUgiFactory,
       SocketFactory socketFactory) {
     super("ContainerRunnerImpl");
     Preconditions.checkState(numExecutors > 0,
@@ -138,15 +138,10 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
 
     this.clusterId = daemonId.getClusterString();
     this.daemonId = daemonId;
-    this.queryTracker = new QueryTracker(conf, localDirsBase, clusterId);
-    addIfService(queryTracker);
-    String waitQueueSchedulerClassName = HiveConf.getVar(
-        conf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME);
-    this.executorService = new TaskExecutorService(numExecutors, waitQueueSize,
-        waitQueueSchedulerClassName, enablePreemption, classLoader, metrics, 
null);
+    this.queryTracker = queryTracker;
+    this.executorService = executorService;
     completionListener = (SchedulerFragmentCompletingListener) executorService;
 
-    addIfService(executorService);
 
     // Distribute the available memory between the tasks.
     this.memoryPerExecutor = (long)(totalMemoryAvailableBytes / (float) 
numExecutors);
@@ -187,6 +182,26 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
   }
 
   @Override
+  public RegisterDagResponseProto registerDag(RegisterDagRequestProto request)
+      throws IOException {
+    QueryIdentifierProto identifier = request.getQueryIdentifier();
+    Credentials credentials;
+    if (request.hasCredentialsBinary()) {
+      credentials = LlapUtil.credentialsFromByteArray(
+          request.getCredentialsBinary().toByteArray());
+    } else {
+      credentials = new Credentials();
+    }
+    queryTracker.registerDag(identifier.getApplicationIdString(),
+        identifier.getDagIndex(), request.getUser(), credentials);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Application with  id={}, dagId={} registered",
+          identifier.getApplicationIdString(), identifier.getDagIndex());
+    }
+    return RegisterDagResponseProto.newBuilder().build();
+  }
+
+  @Override
   public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) 
throws IOException {
     LlapTokenInfo tokenInfo = null;
     try {
@@ -242,11 +257,8 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
       QueryIdentifier queryIdentifier = new QueryIdentifier(
           qIdProto.getApplicationIdString(), dagIdentifier);
 
-      Credentials credentials = new Credentials();
-      DataInputBuffer dib = new DataInputBuffer();
-      byte[] tokenBytes = request.getCredentialsBinary().toByteArray();
-      dib.reset(tokenBytes, tokenBytes.length);
-      credentials.readTokenStorageStream(dib);
+      Credentials credentials = LlapUtil.credentialsFromByteArray(
+          request.getCredentialsBinary().toByteArray());
 
       Token<JobTokenIdentifier> jobToken = 
TokenCache.getSessionToken(credentials);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 52990c5..940be0e 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
@@ -291,9 +292,22 @@ public class LlapDaemon extends CompositeService 
implements ContainerRunner, Lla
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, 
waitQueueSize,
-        enablePreemption, localDirs, this.shufflePort, srvAddress, 
executorMemoryPerInstance, metrics,
-        amReporter, executorClassLoader, daemonId, fsUgiFactory, 
socketFactory);
+
+    QueryTracker queryTracker = new QueryTracker(daemonConf, localDirs,
+        daemonId.getClusterString());
+
+    String waitQueueSchedulerClassName = HiveConf.getVar(
+        daemonConf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME);
+
+    Scheduler<TaskRunnerCallable> executorService = new 
TaskExecutorService(numExecutors, waitQueueSize,
+        waitQueueSchedulerClassName, enablePreemption, executorClassLoader, 
metrics, null);
+
+    addIfService(queryTracker);
+    addIfService(executorService);
+
+    this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors,
+        this.shufflePort, srvAddress, executorMemoryPerInstance, metrics,
+        amReporter, queryTracker, executorService, daemonId, fsUgiFactory, 
socketFactory);
     addIfService(containerRunner);
 
     // Not adding the registry as a service, since we need to control when it 
is initialized - conf used to pickup properties.
@@ -552,6 +566,13 @@ public class LlapDaemon extends CompositeService 
implements ContainerRunner, Lla
   }
 
   @Override
+  public LlapDaemonProtocolProtos.RegisterDagResponseProto registerDag(
+      LlapDaemonProtocolProtos.RegisterDagRequestProto request)
+      throws IOException {
+    return containerRunner.registerDag(request);
+  }
+
+  @Override
   public SubmitWorkResponseProto submitWork(
       SubmitWorkRequestProto request) throws IOException {
     numSubmissions.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
index d856b25..c3a95c8 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
@@ -98,6 +98,18 @@ public class LlapProtocolServerImpl extends AbstractService
   }
 
   @Override
+  public LlapDaemonProtocolProtos.RegisterDagResponseProto registerDag(
+      RpcController controller,
+      LlapDaemonProtocolProtos.RegisterDagRequestProto request)
+      throws ServiceException {
+    try {
+      return containerRunner.registerDag(request);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
   public SubmitWorkResponseProto submitWork(RpcController controller,
       SubmitWorkRequestProto request) throws ServiceException {
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index ab84dcc..932d0ad 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.log.Log4jQueryCompleteMarker;
 import org.apache.hadoop.hive.llap.log.LogHelpers;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
@@ -41,6 +42,7 @@ import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceSta
 import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
 import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
 import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.TokenCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Marker;
@@ -202,6 +204,21 @@ public class QueryTracker extends AbstractService {
     }
   }
 
+  public void registerDag(String applicationId, int dagId, String user,
+      Credentials credentials) {
+    Token<JobTokenIdentifier> jobToken = 
TokenCache.getSessionToken(credentials);
+
+    QueryIdentifier queryIdentifier = new QueryIdentifier(applicationId, 
dagId);
+    ReadWriteLock dagLock = getDagLock(queryIdentifier);
+    dagLock.readLock().lock();
+    try {
+      ShuffleHandler.get()
+          .registerDag(applicationId, dagId, jobToken, user, null);
+    } finally {
+      dagLock.readLock().unlock();
+    }
+  }
+
   /**
    * Indicate to the tracker that a fragment is complete. This is from 
internal execution within the daemon
    * @param fragmentInfo

http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
index 18a37a2..aff2c2e 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
@@ -49,6 +49,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
@@ -169,6 +170,7 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
 
   /* List of registered applications */
   private final ConcurrentMap<String, Integer> registeredApps = new 
ConcurrentHashMap<>();
+  private final ConcurrentMap<String, Integer> registeredDirectories = new 
ConcurrentHashMap<>();
   /* Maps application identifiers (jobIds) to the associated user for the app 
*/
   private final ConcurrentMap<String,String> userRsrc;
   private JobTokenSecretManager secretManager;
@@ -433,18 +435,21 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
    * Register an application and it's associated credentials and user 
information.
    *
    * This method and unregisterDag must be synchronized externally to prevent 
races in shuffle token registration/unregistration
+   * This method may be called several times but we can only set the 
registeredDirectories once which will be
+   * in the first call in which they are not null.
    *
    * @param applicationIdString
    * @param dagIdentifier
    * @param appToken
    * @param user
+   * @param appDirs
    */
   public void registerDag(String applicationIdString, int dagIdentifier,
                           Token<JobTokenIdentifier> appToken,
                           String user, String[] appDirs) {
     Integer registeredDagIdentifier = 
registeredApps.putIfAbsent(applicationIdString, dagIdentifier);
     // App never seen, or previous dag has been unregistered.
-    if (registeredDagIdentifier == null) {
+    if (registeredDagIdentifier == null && appToken != null ) {
       recordJobShuffleInfo(applicationIdString, user, appToken);
     }
     // Register the new dag identifier, if that's not the one currently 
registered.
@@ -454,6 +459,14 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
       // Don't need to recordShuffleInfo since the out of sync unregister will 
not remove the
       // credentials
     }
+
+    if (appDirs == null) {
+      return;
+    }
+    registeredDagIdentifier  = registeredDirectories.put(applicationIdString, 
dagIdentifier);
+    if (registeredDagIdentifier != null && 
!registeredDagIdentifier.equals(dagIdentifier)) {
+      registeredDirectories.put(applicationIdString, dagIdentifier);
+    }
     // First time registration, or new register comes in before the previous 
unregister.
     if (registeredDagIdentifier == null || 
!registeredDagIdentifier.equals(dagIdentifier)) {
       if (dirWatcher != null) {
@@ -487,6 +500,7 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
     // be synchronized, hence the following check is sufficient.
     if (currentDagIdentifier != null && 
currentDagIdentifier.equals(dagIdentifier)) {
       registeredApps.remove(applicationIdString);
+      registeredDirectories.remove(applicationIdString);
       removeJobShuffleInfo(applicationIdString);
     }
     // Unregister for the dirWatcher for the specific dagIdentifier in either 
case.
@@ -514,6 +528,16 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
     }
   }
 
+  @VisibleForTesting
+  public Map getRegisteredApps() {
+    return new HashMap<>(registeredApps);
+  }
+
+  @VisibleForTesting
+  public Map getRegisteredDirectories() {
+    return new HashMap<>(registeredDirectories);
+  }
+
   protected Shuffle getShuffle(Configuration conf) {
     return shuffle;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java
new file mode 100644
index 0000000..21f732b
--- /dev/null
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
+import org.apache.hadoop.security.Credentials;
+import java.io.IOException;
+
+/**
+ * Utils class for testing Llap Daemon.
+ */
+public class LlapDaemonTestUtils {
+  private LlapDaemonTestUtils() {}
+
+  public static SubmitWorkRequestProto buildSubmitProtoRequest(int 
fragmentNumber,
+      String appId, int dagId, int vId, String dagName,
+      int dagStartTime, int attemptStartTime, int numSelfAndUpstreamTasks, int 
numSelfAndUpstreamComplete,
+      int withinDagPriority, Credentials credentials) throws IOException {
+    return SubmitWorkRequestProto
+        .newBuilder()
+        .setAttemptNumber(0)
+        .setFragmentNumber(fragmentNumber)
+        .setWorkSpec(
+            LlapDaemonProtocolProtos.VertexOrBinary.newBuilder().setVertex(
+                LlapDaemonProtocolProtos.SignableVertexSpec
+                    .newBuilder()
+                    .setQueryIdentifier(
+                        
LlapDaemonProtocolProtos.QueryIdentifierProto.newBuilder()
+                            .setApplicationIdString(appId)
+                            .setAppAttemptNumber(0)
+                            .setDagIndex(dagId)
+                            .build())
+                    .setVertexIndex(vId)
+                    .setDagName(dagName)
+                    .setHiveQueryId(dagName)
+                    .setVertexName("MockVertex")
+                    .setUser("MockUser")
+                    .setTokenIdentifier("MockToken_1")
+                    .setProcessorDescriptor(
+                        
LlapDaemonProtocolProtos.EntityDescriptorProto.newBuilder()
+                            .setClassName("MockProcessor").build())
+                    .build()).build())
+        .setAmHost("localhost")
+        .setAmPort(12345)
+        
.setCredentialsBinary(ByteString.copyFrom(LlapTezUtils.serializeCredentials(credentials)))
+        .setContainerIdString("MockContainer_1")
+        .setFragmentRuntimeInfo(LlapDaemonProtocolProtos
+            .FragmentRuntimeInfo
+            .newBuilder()
+            .setDagStartTime(dagStartTime)
+            .setFirstAttemptStartTime(attemptStartTime)
+            .setNumSelfAndUpstreamTasks(numSelfAndUpstreamTasks)
+            .setNumSelfAndUpstreamCompletedTasks(numSelfAndUpstreamComplete)
+            .setWithinDagPriority(withinDagPriority)
+            .build())
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
new file mode 100644
index 0000000..ea4f50b
--- /dev/null
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import com.google.common.primitives.Ints;
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.DaemonId;
+import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
+import org.apache.hadoop.hive.llap.daemon.LlapDaemonTestUtils;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RegisterDagRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
+import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
+import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.hive.llap.security.LlapUgiFactoryFactory;
+import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.security.TokenCache;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.net.SocketFactory;
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test ContainerRunnerImpl.
+ */
+public class TestContainerRunnerImpl {
+  ContainerRunnerImpl containerRunner;
+  LlapDaemonConfiguration daemonConf = new LlapDaemonConfiguration();
+  private final int numExecutors = 1;
+  private final int waitQueueSize = HiveConf.getIntVar(
+      daemonConf, 
HiveConf.ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE);
+  private final boolean enablePreemption = false;
+  private final int numLocalDirs = 1;
+  private final String[] localDirs = new String[numLocalDirs];
+  private final File testWorkDir = new File("target", 
"container-runner-tests");
+  private final AtomicReference<Integer> shufflePort = new AtomicReference<>();
+  private final AtomicReference<InetSocketAddress> srvAddress = new 
AtomicReference<>();
+  private final int executorMemoryPerInstance = 1024;
+  private LlapDaemonExecutorMetrics metrics;
+  private AMReporter amReporter;
+  private final String testUser = "testUser";
+  private final String appId = "application_1540489363818_0021";
+  private final int dagId = 1234;
+  private final int vId = 12345;
+  private final String hostname = "test.cluster";
+  private final DaemonId daemonId = new DaemonId(testUser,
+      "ContainerTests", hostname,
+      appId, System.currentTimeMillis());
+  private final SocketFactory socketFactory = 
NetUtils.getDefaultSocketFactory(daemonConf);
+  private QueryTracker queryTracker;
+  private TaskExecutorService executorService;
+  private InetSocketAddress serverSocket;
+
+
+  @Before
+  public void setup() throws Exception {
+
+    String[] strIntervals = HiveConf.getTrimmedStringsVar(daemonConf,
+        HiveConf.ConfVars.LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS);
+    List<Integer> intervalList = new ArrayList<>();
+    if (strIntervals != null) {
+      for (String strInterval : strIntervals) {
+        intervalList.add(Integer.valueOf(strInterval));
+      }
+    }
+
+    amReporter = mock(AMReporter.class);
+    serverSocket = new InetSocketAddress("localhost", 0);
+    srvAddress.set(serverSocket);
+
+    this.metrics = LlapDaemonExecutorMetrics
+        .create("ContinerRunerTests", MetricsUtils.getUUID(), numExecutors,
+            Ints.toArray(intervalList));
+
+    for (int i = 0; i < numLocalDirs; i++) {
+      File f = new File(testWorkDir, "localDir");
+      f.mkdirs();
+      localDirs[i] = f.getAbsolutePath();
+    }
+    String waitQueueSchedulerClassName = HiveConf.getVar(
+        daemonConf, 
HiveConf.ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME);
+
+    queryTracker = new QueryTracker(daemonConf, localDirs, 
daemonId.getClusterString());
+    executorService = new TaskExecutorService(numExecutors, waitQueueSize,
+        waitQueueSchedulerClassName, enablePreemption, 
Thread.currentThread().getContextClassLoader(), metrics, null);
+
+    shufflePort.set(HiveConf.getIntVar(
+        daemonConf, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT));
+    containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors,
+        this.shufflePort, srvAddress, executorMemoryPerInstance, metrics,
+        amReporter, queryTracker, executorService, daemonId, 
LlapUgiFactoryFactory
+        .createFsUgiFactory(daemonConf), socketFactory);
+
+    ShuffleHandler.initializeAndStart(daemonConf);
+
+    executorService.init(daemonConf);
+    executorService.start();
+    queryTracker.init(daemonConf);
+    queryTracker.start();
+    containerRunner.init(daemonConf);
+    containerRunner.start();
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    containerRunner.serviceStop();
+    queryTracker.serviceStop();
+    executorService.serviceStop();
+    executorService.serviceStop();
+    LlapMetricsSystem.shutdown();
+  }
+
+  @Test(timeout = 10000)
+  public void testRegisterDag() throws Exception {
+    Credentials credentials = new Credentials();
+    Token<LlapTokenIdentifier> sessionToken = new Token<>(
+        "identifier".getBytes(), "testPassword".getBytes(), new Text("kind"), 
new Text("service"));
+    TokenCache.setSessionToken(sessionToken, credentials);
+
+    RegisterDagRequestProto request = RegisterDagRequestProto.newBuilder()
+        .setUser(testUser)
+        
.setCredentialsBinary(ByteString.copyFrom(LlapTezUtils.serializeCredentials(credentials)))
+        .setQueryIdentifier(
+            QueryIdentifierProto.newBuilder()
+                .setApplicationIdString(appId)
+                .setDagIndex(dagId)
+                .build())
+        .build();
+    containerRunner.registerDag(request);
+    Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1);
+    Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), 
dagId);
+    
Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 0);
+
+    containerRunner.registerDag(request);
+    Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1);
+    Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), 
dagId);
+    
Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 0);
+
+    SubmitWorkRequestProto sRequest =
+        LlapDaemonTestUtils.buildSubmitProtoRequest(1, appId,
+            dagId, vId, "dagName", 0, 0,
+            0, 0, 1,
+            credentials);
+
+    containerRunner.submitWork(sRequest);
+    Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1);
+    Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), 
dagId);
+    
Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 1);
+    
Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().get(appId), 
dagId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
index d3aa539..dffbb57 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
@@ -22,30 +22,29 @@ import static 
org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.cr
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
+import org.apache.hadoop.hive.llap.daemon.LlapDaemonTestUtils;
 import org.apache.hadoop.hive.llap.daemon.impl.EvictingPriorityBlockingQueue;
 import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.junit.Test;
 
+import java.io.IOException;
+
 public class TestFirstInFirstOutComparator {
 
   private SubmitWorkRequestProto createRequest(int fragmentNumber, int 
numSelfAndUpstreamTasks, int dagStartTime,
-      int attemptStartTime) {
+      int attemptStartTime) throws IOException {
     // Same priority for all tasks.
     return createRequest(fragmentNumber, numSelfAndUpstreamTasks, 0, 
dagStartTime, attemptStartTime, 1);
   }
 
   private SubmitWorkRequestProto createRequest(int fragmentNumber, int 
numSelfAndUpstreamTasks,
       int numSelfAndUpstreamComplete, int dagStartTime,
-      int attemptStartTime, int withinDagPriority) {
+      int attemptStartTime, int withinDagPriority) throws IOException {
     return createRequest(fragmentNumber, numSelfAndUpstreamTasks, 
numSelfAndUpstreamComplete,
         dagStartTime, attemptStartTime, withinDagPriority, "MockDag");
   }
@@ -54,50 +53,18 @@ public class TestFirstInFirstOutComparator {
   private SubmitWorkRequestProto createRequest(int fragmentNumber, int 
numSelfAndUpstreamTasks,
       int numSelfAndUpstreamComplete, int dagStartTime,
       int attemptStartTime, int withinDagPriority,
-      String dagName) {
+      String dagName) throws IOException {
     ApplicationId appId = ApplicationId.newInstance(9999, 72);
     TezDAGID dagId = TezDAGID.getInstance(appId, 1);
     TezVertexID vId = TezVertexID.getInstance(dagId, 35);
-    return SubmitWorkRequestProto
-        .newBuilder()
-        .setAttemptNumber(0)
-        .setFragmentNumber(fragmentNumber)
-        .setWorkSpec(
-            VertexOrBinary.newBuilder().setVertex(
-                SignableVertexSpec
-                    .newBuilder()
-                    .setQueryIdentifier(
-                        QueryIdentifierProto.newBuilder()
-                            .setApplicationIdString(appId.toString())
-                            .setAppAttemptNumber(0)
-                            .setDagIndex(dagId.getId())
-                            .build())
-                    .setVertexIndex(vId.getId())
-                    .setDagName(dagName)
-                    .setHiveQueryId(dagName)
-                    .setVertexName("MockVertex")
-                    .setUser("MockUser")
-                    .setTokenIdentifier("MockToken_1")
-                    .setProcessorDescriptor(
-                        
EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build())
-                    .build()).build())
-        .setAmHost("localhost")
-        .setAmPort(12345)
-        .setContainerIdString("MockContainer_1")
-        .setFragmentRuntimeInfo(LlapDaemonProtocolProtos
-            .FragmentRuntimeInfo
-            .newBuilder()
-            .setDagStartTime(dagStartTime)
-            .setFirstAttemptStartTime(attemptStartTime)
-            .setNumSelfAndUpstreamTasks(numSelfAndUpstreamTasks)
-            .setNumSelfAndUpstreamCompletedTasks(numSelfAndUpstreamComplete)
-            .setWithinDagPriority(withinDagPriority)
-            .build())
-        .build();
+    return LlapDaemonTestUtils.buildSubmitProtoRequest(fragmentNumber, 
appId.toString(),
+        dagId.getId(), vId.getId(), dagName, dagStartTime, attemptStartTime,
+        numSelfAndUpstreamTasks, numSelfAndUpstreamComplete, withinDagPriority,
+        new Credentials());
   }
 
   @Test (timeout = 60000)
-  public void testWaitQueueComparator() throws InterruptedException {
+  public void testWaitQueueComparator() throws InterruptedException, 
IOException {
     TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 
100000);
     TaskWrapper r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 
100000);
     TaskWrapper r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 
1000000);
@@ -273,7 +240,7 @@ public class TestFirstInFirstOutComparator {
   }
   
   @Test(timeout = 60000)
-  public void testWaitQueueComparatorWithinDagPriority() throws 
InterruptedException {
+  public void testWaitQueueComparatorWithinDagPriority() throws 
InterruptedException, IOException {
     TaskWrapper r1 = createTaskWrapper(createRequest(1, 1, 0, 100, 100, 10), 
false, 100000);
     TaskWrapper r2 = createTaskWrapper(createRequest(2, 1, 0, 100, 100, 1), 
false, 100000);
     TaskWrapper r3 = createTaskWrapper(createRequest(3, 1, 0, 100, 100, 5), 
false, 100000);
@@ -291,7 +258,7 @@ public class TestFirstInFirstOutComparator {
   }
 
   @Test(timeout = 60000)
-  public void testWaitQueueComparatorWithinSameDagPriority() throws 
InterruptedException {
+  public void testWaitQueueComparatorWithinSameDagPriority() throws 
InterruptedException, IOException {
     TaskWrapper r1 = createTaskWrapper(createRequest(1, 1, 0, 10, 100, 10), 
true, 100000);
     TaskWrapper r2 = createTaskWrapper(createRequest(2, 1, 0, 10, 100, 10), 
true, 100000);
     TaskWrapper r3 = createTaskWrapper(createRequest(3, 1, 0, 10, 100, 10), 
true, 100000);
@@ -309,7 +276,7 @@ public class TestFirstInFirstOutComparator {
   }
 
   @Test(timeout = 60000)
-  public void testWaitQueueComparatorParallelism() throws InterruptedException 
{
+  public void testWaitQueueComparatorParallelism() throws 
InterruptedException, IOException {
     TaskWrapper r1 = createTaskWrapper(createRequest(1, 10, 3, 100, 100, 1, 
"q1"), false, 100000);
     TaskWrapper r2 = createTaskWrapper(createRequest(2, 10, 7, 100, 100, 1, 
"q2"), false, 100000);
     TaskWrapper r3 = createTaskWrapper(createRequest(3, 10, 5, 100, 100, 1, 
"q3"), false, 100000);

http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 5d4ce22..0120bb6 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -52,6 +52,8 @@ import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RegisterDagRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RegisterDagResponseProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
@@ -68,7 +70,6 @@ import org.apache.hadoop.hive.llap.tez.Converters;
 import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
 import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker;
 import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC;
@@ -306,6 +307,48 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     void setError(CtxType ctx, Throwable t);
   }
 
+  /**
+   * @param node
+   * @param callback
+   * @return if it was possible to attemp the registration. Sometimes it's
+   * not possible because is a dag is not running
+   */
+  public boolean registerDag(NodeInfo node, final 
OperationCallback<QueryIdentifierProto, Void> callback) {
+    RegisterDagRequestProto.Builder builder = 
RegisterDagRequestProto.newBuilder();
+    if (currentQueryIdentifierProto == null) {
+      return false;
+    }
+    try {
+      RegisterDagRequestProto request = builder
+          .setQueryIdentifier(currentQueryIdentifierProto)
+          .setUser(user)
+          .setCredentialsBinary(
+              getCredentials(getContext()
+                  .getCurrentDagInfo().getCredentials())).build();
+      communicator.registerDag(request, node.getHost(), node.getRpcPort(),
+          new 
LlapProtocolClientProxy.ExecuteRequestCallback<RegisterDagResponseProto>() {
+            @Override
+            public void setResponse(RegisterDagResponseProto response) {
+              callback.setDone(null, currentQueryIdentifierProto);
+            }
+
+            @Override
+            public void indicateError(Throwable t) {
+              LOG.info("Error registering dag with"
+                  + " appId=" + 
currentQueryIdentifierProto.getApplicationIdString()
+                  + " dagId=" + currentQueryIdentifierProto.getDagIndex()
+                  + " to node " + node.getHost());
+              if (!processSendError(t)) {
+                callback.setError(null, t);
+              }
+            }
+          });
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return true;
+  }
+
   public <T> void startUpdateGuaranteed(TezTaskAttemptID attemptId, NodeInfo 
assignedNode,
       boolean newState, final OperationCallback<Boolean, T> callback, final T 
ctx) {
     LlapNodeId nodeId = entityTracker.getNodeIdForTaskAttempt(attemptId);
@@ -795,14 +838,10 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
 
     Preconditions.checkState(currentQueryIdentifierProto.getDagIndex() ==
         
taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
-    ByteBuffer credentialsBinary = 
credentialMap.get(currentQueryIdentifierProto);
-    if (credentialsBinary == null) {
-      credentialsBinary = 
serializeCredentials(getContext().getCurrentDagInfo().getCredentials());
-      credentialMap.putIfAbsent(currentQueryIdentifierProto, 
credentialsBinary.duplicate());
-    } else {
-      credentialsBinary = credentialsBinary.duplicate();
-    }
-    builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
+
+
+    builder.setCredentialsBinary(
+        getCredentials(getContext().getCurrentDagInfo().getCredentials()));
     
builder.setWorkSpec(VertexOrBinary.newBuilder().setVertex(Converters.constructSignableVertexSpec(
         taskSpec, currentQueryIdentifierProto, getTokenIdentifier(), user, 
hiveQueryId)).build());
     // Don't call builder.setWorkSpecSignature() - Tez doesn't sign fragments
@@ -814,16 +853,17 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     return builder.build();
   }
 
-  private ByteBuffer serializeCredentials(Credentials credentials) throws 
IOException {
-    Credentials containerCredentials = new Credentials();
-    containerCredentials.addAll(credentials);
-    DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
-    containerCredentials.writeTokenStorageToStream(containerTokens_dob);
-    return ByteBuffer.wrap(containerTokens_dob.getData(), 0, 
containerTokens_dob.getLength());
+  private ByteString getCredentials(Credentials credentials) throws 
IOException {
+    ByteBuffer credentialsBinary = 
credentialMap.get(currentQueryIdentifierProto);
+    if (credentialsBinary == null) {
+      credentialsBinary = LlapTezUtils.serializeCredentials(credentials);
+      credentialMap.putIfAbsent(currentQueryIdentifierProto, 
credentialsBinary.duplicate());
+    } else {
+      credentialsBinary = credentialsBinary.duplicate();
+    }
+    return ByteString.copyFrom(credentialsBinary);
   }
 
-
-
   protected class LlapTaskUmbilicalProtocolImpl implements 
LlapTaskUmbilicalProtocol {
 
     private final TezTaskUmbilicalProtocol tezUmbilical;

http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 7e8299d..79bca60 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -16,7 +16,6 @@ package org.apache.hadoop.hive.llap.tezplugins;
 
 import com.google.common.io.ByteArrayDataOutput;
 
-import org.apache.hadoop.hive.registry.ServiceInstanceSet;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
@@ -71,6 +70,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
 import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
 import 
org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
@@ -81,12 +81,8 @@ import 
org.apache.hadoop.hive.llap.tezplugins.endpoint.LlapPluginServerImpl;
 import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
 import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerMetrics;
 import org.apache.hadoop.hive.llap.tezplugins.scheduler.LoggingFutureCallback;
-import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
-import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -121,7 +117,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.io.ByteArrayDataOutput;
 import com.google.common.io.ByteStreams;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -133,6 +128,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
   private static final Logger LOG = 
LoggerFactory.getLogger(LlapTaskSchedulerService.class);
   private static final Logger WM_LOG = 
LoggerFactory.getLogger("GuaranteedTasks");
   private static final TaskStartComparator TASK_INFO_COMPARATOR = new 
TaskStartComparator();
+
   private final static Comparator<Priority> PRIORITY_COMPARATOR = new 
Comparator<Priority>() {
     @Override
     public int compare(Priority o1, Priority o2) {
@@ -154,6 +150,31 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
     }
   }
 
+  private final class RegisterDagCallback implements 
OperationCallback<QueryIdentifierProto, Void> {
+    private final LlapServiceInstance llapServiceInstance;
+    private final NodeInfo nodeInfo;
+    RegisterDagCallback(NodeInfo nodeInfo, LlapServiceInstance 
llapServiceInstance) {
+      this.nodeInfo = nodeInfo;
+      this.llapServiceInstance = llapServiceInstance;
+    }
+    @Override
+    public void setDone(Void v, QueryIdentifierProto result) {
+      LOG.info("Dag with"
+          + " appId=" + result.getApplicationIdString()
+          + " dagId=" + result.getDagIndex()
+          + " registered successfully for node " + nodeInfo.getHost());
+      addNode(nodeInfo, llapServiceInstance);
+    }
+
+    @Override
+    public void setError(Void v, Throwable t) {
+      LOG.warn("Error registering dag for node " + nodeInfo.getHost(), t);
+      // In case we fail to register the dag we add the node anyway
+      // We will try to register the dag when we schedule the first container
+      addNode(nodeInfo, llapServiceInstance);
+    }
+  }
+
   // TODO: this is an ugly hack; see the same in LlapTaskCommunicator for 
discussion.
   //       This only lives for the duration of the service init.
   static LlapTaskSchedulerService instance = null;
@@ -761,7 +782,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
       registry.registerStateChangeListener(new NodeStateChangeListener());
       activeInstances = registry.getInstances();
       for (LlapServiceInstance inst : activeInstances.getAll()) {
-        addNode(new NodeInfo(inst, nodeBlacklistConf, clock,
+        registerAndAddNode(new NodeInfo(inst, nodeBlacklistConf, clock,
             numSchedulableTasksPerNode, metrics), inst);
       }
       if (amRegistry != null) {
@@ -788,7 +809,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
     public void onCreate(LlapServiceInstance serviceInstance, int 
ephSeqVersion) {
       LOG.info("Added node with identity: {} as a result of registry callback",
           serviceInstance.getWorkerIdentity());
-      addNode(new NodeInfo(serviceInstance, nodeBlacklistConf, clock,
+      registerAndAddNode(new NodeInfo(serviceInstance, nodeBlacklistConf, 
clock,
           numSchedulableTasksPerNode, metrics), serviceInstance);
     }
 
@@ -1510,6 +1531,18 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
     return new SelectHostResult(randomNode);
   }
 
+  private void registerAndAddNode(NodeInfo node, LlapServiceInstance 
serviceInstance) {
+    if (communicator != null) {
+      boolean registered = communicator
+          .registerDag(node, new RegisterDagCallback(node, serviceInstance));
+      if (!registered) {
+        addNode(node, serviceInstance);
+      }
+    } else {
+      addNode(node, serviceInstance);
+    }
+  }
+
   private void addNode(NodeInfo node, LlapServiceInstance serviceInstance) {
     // we have just added a new node. Signal timeout monitor to reset timer
     if (activeInstances.size() != 0 && timeoutFutureRef.get() != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/10f4eadd/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
index e4af660..f1bc28d 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
@@ -15,13 +15,18 @@
 package org.apache.hadoop.hive.llap.tezplugins;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.Credentials;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.input.MultiMRInput;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
 @InterfaceAudience.Private
 public class LlapTezUtils {
   public static boolean isSourceOfInterest(String inputClassName) {
@@ -48,4 +53,13 @@ public class LlapTezUtils {
     }
     return s;
   }
+
+  public static ByteBuffer serializeCredentials(Credentials credentials) throws
+      IOException {
+    Credentials containerCredentials = new Credentials();
+    containerCredentials.addAll(credentials);
+    DataOutputBuffer containerTokensDob = new DataOutputBuffer();
+    containerCredentials.writeTokenStorageToStream(containerTokensDob);
+    return ByteBuffer.wrap(containerTokensDob.getData(), 0, 
containerTokensDob.getLength());
+  }
 }

Reply via email to