HIVE-16097. minor fixes to metrics and logs in LlapTaskScheduler. (Siddharth 
Seth, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b8944fef
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b8944fef
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b8944fef

Branch: refs/heads/master
Commit: b8944fef4eef81ece620d03f7b378f99e11e69bb
Parents: a6c1ff7
Author: Siddharth Seth <[email protected]>
Authored: Mon Mar 6 18:05:28 2017 -0800
Committer: Siddharth Seth <[email protected]>
Committed: Mon Mar 6 18:05:28 2017 -0800

----------------------------------------------------------------------
 .../hive/llap/daemon/impl/LlapTaskReporter.java |  4 +-
 .../tezplugins/LlapTaskSchedulerService.java    | 95 ++++++++++++++------
 2 files changed, 71 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b8944fef/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
index 2fe1017..3d59702 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
@@ -217,7 +217,7 @@ public class LlapTaskReporter implements 
TaskReporterInterface {
 
         if (response.shouldDie) {
           // AM sent a shouldDie=true
-          LOG.info("Asked to die via task heartbeat");
+          LOG.info("Asked to die via task heartbeat: {}", 
task.getTaskAttemptID());
           return false;
         } else {
           if (response.numEvents < maxEventsToGet) {
@@ -297,7 +297,7 @@ public class LlapTaskReporter implements 
TaskReporterInterface {
       }
 
       if (response.shouldDie()) {
-        LOG.info("Received should die response from AM");
+        LOG.info("Received should die response from AM: {}", 
task.getTaskAttemptID());
         askedToDie.set(true);
         return new ResponseWrapper(true, 1);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/b8944fef/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index cfcf0f0..8fb0966 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -182,6 +182,8 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
   private final SchedulerTimeoutMonitor timeoutMonitor;
   private ScheduledFuture<?> timeoutFuture;
 
+  private final AtomicInteger assignedTaskCounter = new AtomicInteger(0);
+
   private final LlapRegistryService registry = new LlapRegistryService(false);
 
   private volatile ListenableFuture<Void> nodeEnablerFuture;
@@ -362,9 +364,11 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
     @Override
     public void onRemove(ServiceInstance serviceInstance) {
       NodeReport nodeReport = constructNodeReport(serviceInstance, false);
+      LOG.info("Sending out nodeReport for onRemove: {}", nodeReport);
       getContext().nodesUpdated(Collections.singletonList(nodeReport));
       instanceToNodeMap.remove(serviceInstance.getWorkerIdentity());
-      LOG.info("Removed node with identity: {}", 
serviceInstance.getWorkerIdentity());
+      LOG.info("Removed node with identity: {} due to RegistryNotification. 
currentActiveInstances={}",
+          serviceInstance.getWorkerIdentity(), activeInstances.size());
       if (metrics != null) {
         metrics.setClusterNodeCount(activeInstances.size());
       }
@@ -650,6 +654,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
       NodeInfo nodeInfo = taskInfo.assignedNode;
       assert nodeInfo != null;
 
+      //  endReason shows up as OTHER for CONTAINER_TIME_OUT
       LOG.info("Processing de-allocate request for task={}, state={}, 
endReason={}", taskInfo.task,
           taskInfo.getState(), endReason);
       // Re-enable the node if preempted
@@ -680,6 +685,8 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
 
         } else { // Task Failed
           nodeInfo.registerUnsuccessfulTaskEnd(false);
+          // TODO Include EXTERNAL_PREEMPTION in this list?
+          // TODO HIVE-16134. Differentiate between 
EXTERNAL_PREEMPTION_WAITQUEU vs EXTERNAL_PREEMPTION_FINISHABLE?
           if (endReason != null && EnumSet
               .of(TaskAttemptEndReason.EXECUTOR_BUSY, 
TaskAttemptEndReason.COMMUNICATION_ERROR)
               .contains(endReason)) {
@@ -762,12 +769,11 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
               if (nodeInfo != null) {
                 if  (nodeInfo.canAcceptTask()) {
                   // Successfully scheduled.
-                  LOG.info(
-                      "Assigning {} when looking for {}."
-                          + " local=true FirstRequestedHost={}, 
#prefLocations={}", nodeInfo
-                          .toShortString(), host, (prefHostCount == 0) +
-                          (requestedHosts.length > 1 ? ", #prefLocations=" + 
requestedHosts.length :
-                              ""));
+                  LOG.info("Assigning {} when looking for {}." +
+                          " local=true FirstRequestedHost={}, 
#prefLocations={}",
+                      nodeInfo.toShortString(), host,
+                      (prefHostCount == 0),
+                      requestedHosts.length);
                   return new SelectHostResult(nodeInfo);
                 } else {
                   // The node cannot accept a task at the moment.
@@ -918,12 +924,19 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
     NodeReport nodeReport = constructNodeReport(serviceInstance, true);
     getContext().nodesUpdated(Collections.singletonList(nodeReport));
 
+    // When the same node goes away and comes back... the old entry will be 
lost - which means
+    // we don't know how many fragments we have actually scheduled on this 
node.
+
+    // Replacing it is the right thing to do though, since we expect the AM to 
kill all the fragments running on the node, via timeouts.
+    // De-allocate messages coming in from the old node are sent to the 
NodeInfo instance for the old node.
+
     instanceToNodeMap.put(node.getNodeIdentity(), node);
     if (metrics != null) {
       metrics.setClusterNodeCount(activeInstances.size());
     }
     // Trigger scheduling since a new node became available.
-    LOG.info("Adding new node: {}", node);
+    LOG.info("Adding new node: {}. TotalNodeCount={}. activeInstances.size={}",
+        node, instanceToNodeMap.size(), activeInstances.size());
     trySchedulingPendingTasks();
   }
 
@@ -933,6 +946,9 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
       LOG.info("Attempting to re-enable node: " + nodeInfo.toShortString());
       if (activeInstances.getInstance(nodeInfo.getNodeIdentity()) != null) {
         nodeInfo.enableNode();
+        if (metrics != null) {
+          metrics.setDisabledNodeCount(disabledNodesQueue.size());
+        }
       } else {
         if (LOG.isInfoEnabled()) {
           LOG.info(
@@ -951,13 +967,11 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
    * @param nodeInfo  the node to be re-enabled
    */
   private void queueNodeForReEnablement(final NodeInfo nodeInfo) {
-    nodeInfo.enableNode();
     if ( disabledNodesQueue.remove(nodeInfo)) {
+      LOG.info("Queueing node for re-enablement: {}", 
nodeInfo.toShortString());
+      nodeInfo.resetExpireInformation();
       disabledNodesQueue.add(nodeInfo);
     }
-    if (metrics != null) {
-      metrics.setDisabledNodeCount(disabledNodesQueue.size());
-    }
   }
 
   private void disableNode(NodeInfo nodeInfo, boolean isCommFailure) {
@@ -1163,9 +1177,9 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
               // Preempt on specific host
               boolean shouldPreempt = true;
               for (String host : potentialHosts) {
-                // Preempt only if there are not pending preemptions on the 
same host
+                // Preempt only if there are no pending preemptions on the 
same host
                 // When the premption registers, the request at the highest 
priority will be given the slot,
-                // even if the initial request was for some other task.
+                // even if the initial preemption was caused by some other 
task.
                 // TODO Maybe register which task the preemption was for, to 
avoid a bad non-local allocation.
                 MutableInt pendingHostPreemptions = 
pendingPreemptionsPerHost.get(host);
                 if (pendingHostPreemptions != null && 
pendingHostPreemptions.intValue() > 0) {
@@ -1178,7 +1192,7 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
               }
               if (shouldPreempt) {
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("Preempting for {} on potential hosts={}. 
TotalPendingPreemptions={}",
+                  LOG.debug("Attempting to preempt for {} on potential 
hosts={}. TotalPendingPreemptions={}",
                       taskInfo.task, Arrays.toString(potentialHosts), 
pendingPreemptions.get());
                 }
                 preemptTasks(entry.getKey().getPriority(), 1, potentialHosts);
@@ -1194,7 +1208,11 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
               LOG.debug("Attempting to preempt on any host for task={}, 
pendingPreemptions={}",
                   taskInfo.task, pendingPreemptions.get());
               if (pendingPreemptions.get() == 0) {
-                LOG.info("Preempting for task={} on any available host", 
taskInfo.task);
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug(
+                      "Attempting to preempt for task={}, priority={} on any 
available host",
+                      taskInfo.task, taskInfo.priority);
+                }
                 preemptTasks(entry.getKey().getPriority(), 1, null);
               } else {
                 if (LOG.isDebugEnabled()) {
@@ -1263,7 +1281,10 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
               nodeInfo.getServiceAddress());
       writeLock.lock(); // While updating local structures
       try {
-        LOG.info("Assigned task={} on node={}, to container={}",
+        // The canAccept part of this log message does not account for this 
allocation.
+        assignedTaskCounter.incrementAndGet();
+        LOG.info("Assigned #{}, task={} on node={}, to container={}",
+            assignedTaskCounter.get(),
             taskInfo, nodeInfo.toShortString(), container.getId());
         dagStats.registerTaskAllocated(taskInfo.requestedHosts, 
taskInfo.requestedRacks,
             nodeInfo.getHost());
@@ -1321,7 +1342,8 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
           }
         } else {
           // No tasks qualify as preemptable
-          LOG.debug("No tasks qualify as killable to schedule tasks at 
priority {}", forPriority);
+          LOG.debug("No tasks qualify as killable to schedule tasks at 
priority {}. Current priority={}",
+              forPriority, entryAtPriority.getKey());
           break;
         }
       }
@@ -1572,6 +1594,8 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
     private final LlapTaskSchedulerMetrics metrics;
     private final Resource resourcePerExecutor;
 
+    private final String shortStringBase;
+
     /**
      * Create a NodeInfo bound to a service instance
      *  @param serviceInstance         the associated serviceInstance
@@ -1612,6 +1636,7 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
       if (metrics != null) {
         metrics.incrSchedulableTasksCount(numSchedulableTasks);
       }
+      shortStringBase = setupShortStringBase();
 
     }
 
@@ -1635,12 +1660,16 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
       return resourcePerExecutor;
     }
 
-    void enableNode() {
+    void resetExpireInformation() {
       expireTimeMillis = -1;
-      disabled = false;
       hadCommFailure = false;
     }
 
+    void enableNode() {
+      resetExpireInformation();
+      disabled = false;
+    }
+
     void disableNode(boolean commFailure) {
       long duration = blacklistConf.minDelay;
       long currentTime = clock.getTime();
@@ -1662,7 +1691,7 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
       }
       if (LOG.isInfoEnabled()) {
         LOG.info("Disabling instance {} for {} milli-seconds. commFailure={}",
-            serviceInstance,
+            toShortString(),
             delayTime, commFailure);
       }
       expireTimeMillis = currentTime + delayTime;
@@ -1716,13 +1745,17 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
       return hadCommFailure;
     }
 
+    boolean _canAccepInternal() {
+      return !hadCommFailure && !disabled
+          &&(numSchedulableTasks == -1 || ((numSchedulableTasks - 
numScheduledTasks) > 0));
+    }
+
     int canAcceptCounter = 0;
     /* Returning true does not guarantee that the task will run, considering 
other queries
     may be running in the system. Also depends upon the capacity usage 
configuration
      */
     boolean canAcceptTask() {
-      boolean result = !hadCommFailure && !disabled
-          &&(numSchedulableTasks == -1 || ((numSchedulableTasks - 
numScheduledTasks) > 0));
+      boolean result = _canAccepInternal();
       if (LOG.isTraceEnabled()) {
         LOG.trace(constructCanAcceptLogResult(result));
       }
@@ -1763,6 +1796,10 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
       }
     }
 
+    private String setupShortStringBase() {
+      return "{" + serviceInstance.getHost() + ":" + 
serviceInstance.getRpcPort() + ", id=" + getNodeIdentity();
+    }
+
     @Override
     public String toString() {
       return "NodeInfo{" + "instance=" + serviceInstance
@@ -1777,11 +1814,17 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
     }
 
     private String toShortString() {
-      return "{" + serviceInstance.getHost() + ":" +
-          serviceInstance.getRpcPort() + ", id=" + getNodeIdentity() +
-          ", stc=" + numSchedulableTasks + "}";
+      StringBuilder sb = new StringBuilder();
+      sb.append(", canAcceptTask=").append(_canAccepInternal());
+      sb.append(", st=").append(numScheduledTasks);
+      sb.append(", ac=").append((numSchedulableTasks - numScheduledTasks));
+      sb.append(", commF=").append(hadCommFailure);
+      sb.append(", disabled=").append(disabled);
+      sb.append("}");
+      return shortStringBase + sb.toString();
     }
 
+
   }
 
   @VisibleForTesting

Reply via email to