Repository: hive
Updated Branches:
  refs/heads/master ebb83b5d1 -> c1381fc19


HIVE-15918. Add some debug messages to identify an issue getting runtimeInfo 
from tez. (Siddharth Seth, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/684bda00
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/684bda00
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/684bda00

Branch: refs/heads/master
Commit: 684bda005208d637aec1d2bd7237c3c8472f43bd
Parents: ebb83b5
Author: Siddharth Seth <[email protected]>
Authored: Thu Feb 16 09:40:03 2017 -0800
Committer: Siddharth Seth <[email protected]>
Committed: Thu Feb 16 09:40:03 2017 -0800

----------------------------------------------------------------------
 .../llap/tezplugins/LlapTaskCommunicator.java   | 18 +++++++-
 .../tezplugins/LlapTaskSchedulerService.java    | 16 +++++++
 .../tezplugins/helpers/SourceStateTracker.java  | 44 ++++++++++++++++++--
 3 files changed, 72 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/684bda00/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 3c7c5d2..893b7d9 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -321,8 +321,22 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     nodesForQuery.add(nodeId);
 
     sourceStateTracker.registerTaskForStateUpdates(host, port, 
taskSpec.getInputs());
-    FragmentRuntimeInfo fragmentRuntimeInfo = 
sourceStateTracker.getFragmentRuntimeInfo(
-        taskSpec.getVertexName(), 
taskSpec.getTaskAttemptID().getTaskID().getId(), priority);
+    FragmentRuntimeInfo fragmentRuntimeInfo;
+    try {
+      fragmentRuntimeInfo = sourceStateTracker.getFragmentRuntimeInfo(
+          taskSpec.getVertexName(),
+          taskSpec.getTaskAttemptID().getTaskID().getId(), priority);
+    } catch (Exception e) {
+      LOG.error(
+          "Error while trying to get runtimeFragmentInfo for fragmentId={}, 
containerId={}, currentQI={}, currentQueryId={}",
+          taskSpec.getTaskAttemptID(), containerId, 
currentQueryIdentifierProto,
+          currentHiveQueryId, e);
+      if (e instanceof RuntimeException) {
+        throw (RuntimeException) e;
+      } else {
+        throw new RuntimeException(e);
+      }
+    }
     SubmitWorkRequestProto requestProto;
 
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/684bda00/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index dc594a2..97191f8 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -529,6 +529,22 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
     try {
       dagRunning = false;
       dagStats = new StatsPerDag();
+      int pendingCount = 0;
+      for (Entry<Priority, List<TaskInfo>> entry : pendingTasks.entrySet()) {
+        if (entry.getValue() != null) {
+          pendingCount += entry.getValue().size();
+        }
+      }
+      int runningCount = 0;
+      for (Entry<Integer, TreeSet<TaskInfo>> entry : runningTasks.entrySet()) {
+        if (entry.getValue() != null) {
+          runningCount += entry.getValue().size();
+        }
+      }
+
+      LOG.info(
+          "DAG reset. Current knownTaskCount={}, pendingTaskCount={}, 
runningTaskCount={}",
+          knownTasks.size(), pendingCount, runningCount);
     } finally {
       writeLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/684bda00/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
index 2a66e4d..9589a3a 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
@@ -120,8 +120,8 @@ public class SourceStateTracker {
     SourceInfo sourceInfo = getSourceInfo(sourceName);
     // Update source info if the state is SUCCEEDED
     if (sourceState == VertexState.SUCCEEDED) {
-      sourceInfo.numCompletedTasks = 
taskCommunicatorContext.getVertexCompletedTaskCount(sourceName);
-      sourceInfo.numTasks = 
taskCommunicatorContext.getVertexTotalTaskCount(sourceName);
+      sourceInfo.numCompletedTasks = getVertexCompletedTaskCount(sourceName);
+      sourceInfo.numTasks = getVertexTotalTaskCount(sourceName);
     }
     sourceInfo.lastKnownState = sourceState;
     // Checking state per node for future failure handling scenarios, where an 
update
@@ -172,8 +172,9 @@ public class SourceStateTracker {
       completedTaskCount.add(sourceInfo.numCompletedTasks);
       totalTaskCount.add(sourceInfo.numTasks);
     } else {
-      
completedTaskCount.add(taskCommunicatorContext.getVertexCompletedTaskCount(sourceName));
-      int totalCount 
=taskCommunicatorContext.getVertexTotalTaskCount(sourceName);
+      completedTaskCount.add(getVertexCompletedTaskCount(sourceName));
+      int totalCount = getVertexTotalTaskCount(sourceName);
+
       // Uninitialized vertices will report count as 0.
       totalCount = totalCount == -1 ? 0 : totalCount;
       totalTaskCount.add(totalCount);
@@ -272,6 +273,41 @@ public class SourceStateTracker {
     }
   }
 
+  private int getVertexCompletedTaskCount(String vname) {
+    int completedTaskCount;
+    try {
+      completedTaskCount =
+          taskCommunicatorContext.getVertexCompletedTaskCount(vname);
+      return completedTaskCount;
+    } catch (Exception e) {
+      LOG.error("Failed to get vertex completed task count for sourceName={}",
+          vname);
+      if (e instanceof RuntimeException) {
+        throw (RuntimeException) e;
+      } else {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private int getVertexTotalTaskCount(String vname) {
+    int totalCount;
+    try {
+      totalCount =
+          taskCommunicatorContext.getVertexTotalTaskCount(vname);
+      return totalCount;
+    } catch (Exception e) {
+      LOG.error("Failed to get total task count for sourceName={}", vname);
+      if (e instanceof RuntimeException) {
+        throw (RuntimeException)e;
+      } else {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+
+
 
 
   void sendStateUpdateToNode(LlapNodeId nodeId, String sourceName, VertexState 
state) {

Reply via email to