Repository: hive Updated Branches: refs/heads/master ebb83b5d1 -> c1381fc19
HIVE-15918. Add some debug messages to identify an issue getting runtimeInfo from tez. (Siddharth Seth, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/684bda00 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/684bda00 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/684bda00 Branch: refs/heads/master Commit: 684bda005208d637aec1d2bd7237c3c8472f43bd Parents: ebb83b5 Author: Siddharth Seth <[email protected]> Authored: Thu Feb 16 09:40:03 2017 -0800 Committer: Siddharth Seth <[email protected]> Committed: Thu Feb 16 09:40:03 2017 -0800 ---------------------------------------------------------------------- .../llap/tezplugins/LlapTaskCommunicator.java | 18 +++++++- .../tezplugins/LlapTaskSchedulerService.java | 16 +++++++ .../tezplugins/helpers/SourceStateTracker.java | 44 ++++++++++++++++++-- 3 files changed, 72 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/684bda00/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 3c7c5d2..893b7d9 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -321,8 +321,22 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { nodesForQuery.add(nodeId); sourceStateTracker.registerTaskForStateUpdates(host, port, taskSpec.getInputs()); - FragmentRuntimeInfo fragmentRuntimeInfo = sourceStateTracker.getFragmentRuntimeInfo( - taskSpec.getVertexName(), taskSpec.getTaskAttemptID().getTaskID().getId(), priority); + FragmentRuntimeInfo fragmentRuntimeInfo; + try { + fragmentRuntimeInfo = sourceStateTracker.getFragmentRuntimeInfo( + taskSpec.getVertexName(), + taskSpec.getTaskAttemptID().getTaskID().getId(), priority); + } catch (Exception e) { + LOG.error( + "Error while trying to get runtimeFragmentInfo for fragmentId={}, containerId={}, currentQI={}, currentQueryId={}", + taskSpec.getTaskAttemptID(), containerId, currentQueryIdentifierProto, + currentHiveQueryId, e); + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } else { + throw new RuntimeException(e); + } + } SubmitWorkRequestProto requestProto; try { http://git-wip-us.apache.org/repos/asf/hive/blob/684bda00/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index dc594a2..97191f8 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -529,6 +529,22 @@ public class LlapTaskSchedulerService extends TaskScheduler { try { dagRunning = false; dagStats = new StatsPerDag(); + int pendingCount = 0; + for (Entry<Priority, List<TaskInfo>> entry : pendingTasks.entrySet()) { + if (entry.getValue() != null) { + pendingCount += entry.getValue().size(); + } + } + int runningCount = 0; + for (Entry<Integer, TreeSet<TaskInfo>> entry : runningTasks.entrySet()) { + if (entry.getValue() != null) { + runningCount += entry.getValue().size(); + } + } + + LOG.info( + "DAG reset. Current knownTaskCount={}, pendingTaskCount={}, runningTaskCount={}", + knownTasks.size(), pendingCount, runningCount); } finally { writeLock.unlock(); } http://git-wip-us.apache.org/repos/asf/hive/blob/684bda00/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java index 2a66e4d..9589a3a 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java @@ -120,8 +120,8 @@ public class SourceStateTracker { SourceInfo sourceInfo = getSourceInfo(sourceName); // Update source info if the state is SUCCEEDED if (sourceState == VertexState.SUCCEEDED) { - sourceInfo.numCompletedTasks = taskCommunicatorContext.getVertexCompletedTaskCount(sourceName); - sourceInfo.numTasks = taskCommunicatorContext.getVertexTotalTaskCount(sourceName); + sourceInfo.numCompletedTasks = getVertexCompletedTaskCount(sourceName); + sourceInfo.numTasks = getVertexTotalTaskCount(sourceName); } sourceInfo.lastKnownState = sourceState; // Checking state per node for future failure handling scenarios, where an update @@ -172,8 +172,9 @@ public class SourceStateTracker { completedTaskCount.add(sourceInfo.numCompletedTasks); totalTaskCount.add(sourceInfo.numTasks); } else { - completedTaskCount.add(taskCommunicatorContext.getVertexCompletedTaskCount(sourceName)); - int totalCount =taskCommunicatorContext.getVertexTotalTaskCount(sourceName); + completedTaskCount.add(getVertexCompletedTaskCount(sourceName)); + int totalCount = getVertexTotalTaskCount(sourceName); + // Uninitialized vertices will report count as 0. totalCount = totalCount == -1 ? 0 : totalCount; totalTaskCount.add(totalCount); @@ -272,6 +273,41 @@ public class SourceStateTracker { } } + private int getVertexCompletedTaskCount(String vname) { + int completedTaskCount; + try { + completedTaskCount = + taskCommunicatorContext.getVertexCompletedTaskCount(vname); + return completedTaskCount; + } catch (Exception e) { + LOG.error("Failed to get vertex completed task count for sourceName={}", + vname); + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } else { + throw new RuntimeException(e); + } + } + } + + private int getVertexTotalTaskCount(String vname) { + int totalCount; + try { + totalCount = + taskCommunicatorContext.getVertexTotalTaskCount(vname); + return totalCount; + } catch (Exception e) { + LOG.error("Failed to get total task count for sourceName={}", vname); + if (e instanceof RuntimeException) { + throw (RuntimeException)e; + } else { + throw new RuntimeException(e); + } + } + } + + + void sendStateUpdateToNode(LlapNodeId nodeId, String sourceName, VertexState state) {
