Repository: hive
Updated Branches:
  refs/heads/master 8841bbf13 -> b8944fef4


HIVE-16094. queued containers may timeout if they don't get to run for a long 
time. (Siddharth Seth, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a6c1ff71
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a6c1ff71
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a6c1ff71

Branch: refs/heads/master
Commit: a6c1ff71b68c0dbe98781d0b8162f328080066b7
Parents: 8841bbf
Author: Siddharth Seth <[email protected]>
Authored: Mon Mar 6 18:03:06 2017 -0800
Committer: Siddharth Seth <[email protected]>
Committed: Mon Mar 6 18:03:06 2017 -0800

----------------------------------------------------------------------
 .../hive/llap/daemon/impl/AMReporter.java       | 93 ++++++++++++--------
 .../llap/daemon/impl/ContainerRunnerImpl.java   |  3 +-
 .../llap/daemon/impl/TaskRunnerCallable.java    |  7 +-
 .../llap/tezplugins/LlapTaskCommunicator.java   |  2 +-
 4 files changed, 64 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a6c1ff71/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index 65f7232..e5bd05e 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -111,7 +111,7 @@ public class AMReporter extends AbstractService {
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
   // Tracks appMasters to which heartbeats are being sent. This should not be 
used for any other
   // messages like taskKilled, etc.
-  private final Map<LlapNodeId, AMNodeInfo> knownAppMasters = new HashMap<>();
+  private final Map<QueryIdentifier, AMNodeInfo> knownAppMasters = new 
HashMap<>();
   volatile ListenableFuture<Void> queueLookupFuture;
   private final DaemonId daemonId;
 
@@ -198,34 +198,42 @@ public class AMReporter extends AbstractService {
       Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier,
       TezTaskAttemptID attemptId) {
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Registering for heartbeat: " + amLocation + ":" + port + " 
for queryIdentifier=" + queryIdentifier);
+      LOG.trace(
+          "Registering for heartbeat: {}, queryIdentifier={}, attemptId={}",
+          (amLocation + ":" + port), queryIdentifier, attemptId);
     }
     AMNodeInfo amNodeInfo;
+
+    // Since we don't have an explicit AM end signal yet - we're going to 
create
+    // and discard AMNodeInfo instances per query.
     synchronized (knownAppMasters) {
       LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
-      amNodeInfo = knownAppMasters.get(amNodeId);
+      amNodeInfo = knownAppMasters.get(queryIdentifier);
       if (amNodeInfo == null) {
         amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier,
             retryPolicy, retryTimeout, socketFactory, conf);
-        knownAppMasters.put(amNodeId, amNodeInfo);
+        knownAppMasters.put(queryIdentifier, amNodeInfo);
         // Add to the queue only the first time this is registered, and on
         // subsequent instances when it's taken off the queue.
         amNodeInfo.setNextHeartbeatTime(System.currentTimeMillis() + 
heartbeatInterval);
         pendingHeartbeatQueeu.add(amNodeInfo);
+        // AMNodeInfo will only be cleared when a queryComplete is received 
for this query, or
+        // when we detect a failure on the AM side (failure to heartbeat).
+        // A single queueLookupCallable is added here. We have to make sure 
one instance stays
+        // in the queue till the query completes.
       }
-      amNodeInfo.setCurrentQueryIdentifier(queryIdentifier);
       amNodeInfo.addTaskAttempt(attemptId);
     }
   }
 
-  public void unregisterTask(String amLocation, int port, TezTaskAttemptID ta) 
{
+  public void unregisterTask(String amLocation, int port, QueryIdentifier 
queryIdentifier, TezTaskAttemptID ta) {
+
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Un-registering for heartbeat: " + amLocation + ":" + port);
+      LOG.trace("Un-registering for heartbeat: {}, attempt={}", (amLocation + 
":" + port), ta);
     }
     AMNodeInfo amNodeInfo;
-    LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
     synchronized (knownAppMasters) {
-      amNodeInfo = knownAppMasters.get(amNodeId);
+      amNodeInfo = knownAppMasters.get(queryIdentifier);
       if (amNodeInfo == null) {
         LOG.info(("Ignoring duplicate unregisterRequest for am at: " + 
amLocation + ":" + port));
       } else {
@@ -241,7 +249,7 @@ public class AMReporter extends AbstractService {
     LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
     AMNodeInfo amNodeInfo;
     synchronized (knownAppMasters) {
-      amNodeInfo = knownAppMasters.get(amNodeId);
+      amNodeInfo = knownAppMasters.get(queryIdentifier);
       if (amNodeInfo == null) {
         amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, 
retryPolicy, retryTimeout, socketFactory,
           conf);
@@ -266,10 +274,17 @@ public class AMReporter extends AbstractService {
     });
   }
 
-  public void queryComplete(LlapNodeId llapNodeId) {
-    if (llapNodeId != null) {
+  public void queryComplete(QueryIdentifier queryIdentifier) {
+    if (queryIdentifier != null) {
       synchronized (knownAppMasters) {
-        AMNodeInfo amNodeInfo = knownAppMasters.remove(llapNodeId);
+        AMNodeInfo amNodeInfo = knownAppMasters.remove(queryIdentifier);
+
+        // The AM can be used for multiple queries. This is an indication that 
a single query is complete.
+        // We don't have a good mechanism to know when an app ends. Removing 
this right now ensures
+        // that a new one gets created for the next query on the same AM.
+        if (amNodeInfo != null) {
+          amNodeInfo.setIsDone(true);
+        }
         // TODO: not stopping umbilical explicitly as some taskKill requests 
may get scheduled during queryComplete
         // which will be using the umbilical. HIVE-16021 should fix this, 
until then leave umbilical open and wait for
         // it to be closed after max idle timeout (10s default)
@@ -287,22 +302,26 @@ public class AMReporter extends AbstractService {
       while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
         try {
           final AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take();
-          if (amNodeInfo.hasAmFailed()) {
+          if (amNodeInfo.hasAmFailed() || amNodeInfo.isDone()) {
             synchronized (knownAppMasters) {
               if (LOG.isDebugEnabled()) {
                 LOG.debug(
-                    "Removing am {} with last associated dag {} from heartbeat 
with taskCount={}, amFailed={}",
-                    amNodeInfo.amNodeId, 
amNodeInfo.getCurrentQueryIdentifier(), amNodeInfo.getTaskCount(),
-                    amNodeInfo.hasAmFailed(), amNodeInfo);
+                    "Removing am {} with last associated dag {} from heartbeat 
with taskCount={}, amFailed={}, isDone={}",
+                    amNodeInfo.amNodeId, amNodeInfo.getQueryIdentifier(), 
amNodeInfo.getTaskCount(),
+                    amNodeInfo.hasAmFailed(), amNodeInfo.isDone());
               }
-              knownAppMasters.remove(amNodeInfo.amNodeId);
+              knownAppMasters.remove(amNodeInfo.getQueryIdentifier());
             }
           } else {
+            // Always re-schedule the next callable - irrespective of task 
count,
+            // in case new tasks come in later.
+            long next = System.currentTimeMillis() + heartbeatInterval;
+            amNodeInfo.setNextHeartbeatTime(next);
+            pendingHeartbeatQueeu.add(amNodeInfo);
+
+            // Send an actual heartbeat only if the task count is > 0
             if (amNodeInfo.getTaskCount() > 0) {
               // Add back to the queue for the next heartbeat, and schedule 
the actual heartbeat
-              long next = System.currentTimeMillis() + heartbeatInterval;
-              amNodeInfo.setNextHeartbeatTime(next);
-              pendingHeartbeatQueeu.add(amNodeInfo);
               ListenableFuture<Void> future = executor.submit(new 
AMHeartbeatCallable(amNodeInfo));
               Futures.addCallback(future, new FutureCallback<Void>() {
                 @Override
@@ -312,9 +331,9 @@ public class AMReporter extends AbstractService {
 
                 @Override
                 public void onFailure(Throwable t) {
-                  QueryIdentifier currentQueryIdentifier = 
amNodeInfo.getCurrentQueryIdentifier();
+                  QueryIdentifier currentQueryIdentifier = 
amNodeInfo.getQueryIdentifier();
                   amNodeInfo.setAmFailed(true);
-                  LOG.warn("Heartbeat failed to AM {}. Killing all other tasks 
for the query={}",
+                  LOG.warn("Heartbeat failed to AM {}. Marking query as 
failed. query={}",
                     amNodeInfo.amNodeId, currentQueryIdentifier, t);
                   queryFailedHandler.queryFailed(currentQueryIdentifier);
                 }
@@ -374,9 +393,6 @@ public class AMReporter extends AbstractService {
       }
       List<TezTaskAttemptID> tasks = amNodeInfo.getTasksSnapshot();
       if (tasks.isEmpty()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Skipping node heartbeat to AM: " + amNodeInfo + ", since 
ref count is 0");
-        }
         return null;
       }
       try {
@@ -388,7 +404,7 @@ public class AMReporter extends AbstractService {
         amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()),
             new Text(daemonId.getUniqueNodeIdInCluster()), nodeId.getPort(), 
aw);
       } catch (IOException e) {
-        QueryIdentifier currentQueryIdentifier = 
amNodeInfo.getCurrentQueryIdentifier();
+        QueryIdentifier currentQueryIdentifier = 
amNodeInfo.getQueryIdentifier();
         amNodeInfo.setAmFailed(true);
         LOG.warn("Failed to communicated with AM at {}. Killing remaining 
fragments for query {}",
             amNodeInfo.amNodeId, currentQueryIdentifier, e);
@@ -416,9 +432,10 @@ public class AMReporter extends AbstractService {
     private final long timeout;
     private final SocketFactory socketFactory;
     private final AtomicBoolean amFailed = new AtomicBoolean(false);
-    private QueryIdentifier currentQueryIdentifier;
+    private final QueryIdentifier queryIdentifier;
     private LlapTaskUmbilicalProtocol umbilical;
     private long nextHeartbeatTime;
+    private final AtomicBoolean isDone = new AtomicBoolean(false);
 
 
     public AMNodeInfo(LlapNodeId amNodeId, String user,
@@ -430,7 +447,7 @@ public class AMReporter extends AbstractService {
                       Configuration conf) {
       this.user = user;
       this.jobToken = jobToken;
-      this.currentQueryIdentifier = currentQueryIdentifier;
+      this.queryIdentifier = currentQueryIdentifier;
       this.retryPolicy = retryPolicy;
       this.timeout = timeout;
       this.socketFactory = socketFactory;
@@ -491,6 +508,14 @@ public class AMReporter extends AbstractService {
       return amFailed.get();
     }
 
+    void setIsDone(boolean val) {
+      isDone.set(val);
+    }
+
+    boolean isDone() {
+      return isDone.get();
+    }
+
     List<TezTaskAttemptID> getTasksSnapshot() {
       List<TezTaskAttemptID> result = new ArrayList<>();
       synchronized (tasks) {
@@ -499,12 +524,8 @@ public class AMReporter extends AbstractService {
       return result;
     }
 
-    public synchronized QueryIdentifier getCurrentQueryIdentifier() {
-      return currentQueryIdentifier;
-    }
-
-    public synchronized void setCurrentQueryIdentifier(QueryIdentifier 
queryIdentifier) {
-      this.currentQueryIdentifier = queryIdentifier;
+    public QueryIdentifier getQueryIdentifier() {
+      return queryIdentifier;
     }
 
     synchronized void setNextHeartbeatTime(long nextTime) {
@@ -530,7 +551,7 @@ public class AMReporter extends AbstractService {
 
     @Override
     public String toString() {
-      return "AMInfo: " + amNodeId + ", taskCount=" + getTaskCount();
+      return "AMInfo: " + amNodeId + ", taskCount=" + getTaskCount() + ", 
queryIdentifier=" + queryIdentifier;
     }
 
     private int getTaskCount() {

http://git-wip-us.apache.org/repos/asf/hive/blob/a6c1ff71/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index af8f5b0..ca476ec 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -401,8 +401,7 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
           fragmentInfo.getFragmentIdentifierString());
         
executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
       }
-      LlapNodeId amNodeId = queryInfo.getAmNodeId();
-      amReporter.queryComplete(amNodeId);
+      amReporter.queryComplete(queryIdentifier);
     }
     return QueryCompleteResponseProto.getDefaultInstance();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/a6c1ff71/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 25dc569..f24a647 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -188,7 +188,8 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
 
       // Unregister from the AMReporter, since the task is now running.
       TezTaskAttemptID ta = taskSpec.getTaskAttemptID();
-      this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort(), 
ta);
+      this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort(),
+          fragmentInfo.getQueryInfo().getQueryIdentifier(), ta);
 
       synchronized (this) {
         if (!shouldRunTask) {
@@ -355,7 +356,9 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
             // If the task hasn't started - inform about fragment completion 
immediately. It's possible for
             // the callable to never run.
             fragmentCompletionHanler.fragmentComplete(fragmentInfo);
-            this.amReporter.unregisterTask(request.getAmHost(), 
request.getAmPort(), ta);
+            this.amReporter
+                .unregisterTask(request.getAmHost(), request.getAmPort(),
+                    fragmentInfo.getQueryInfo().getQueryIdentifier(), ta);
           }
         }
       } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/a6c1ff71/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index c700913..8bb6cab 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -761,10 +761,10 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     @Override
     public void nodeHeartbeat(
         Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws 
IOException {
-      nodePinged(hostname.toString(), uniqueId.toString(), port, aw);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Received heartbeat from [" + hostname + ":" + port +" (" + 
uniqueId +")]");
       }
+      nodePinged(hostname.toString(), uniqueId.toString(), port, aw);
     }
 
     @Override

Reply via email to