Repository: hive Updated Branches: refs/heads/master b6d9dfe29 -> 5814c1152
HIVE-16319. LLAP: Better handling of an empty wait queue, should try scheduling checks. (Siddharth Seth, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5814c115 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5814c115 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5814c115 Branch: refs/heads/master Commit: 5814c11528e380a8d389e77232823f76029ecc5d Parents: b6d9dfe Author: Siddharth Seth <[email protected]> Authored: Tue Mar 28 17:12:02 2017 -0700 Committer: Siddharth Seth <[email protected]> Committed: Tue Mar 28 17:12:02 2017 -0700 ---------------------------------------------------------------------- .../hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java | 5 +++++ .../hadoop/hive/llap/daemon/impl/TaskExecutorService.java | 7 ++++--- .../hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java | 1 + 3 files changed, 10 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/5814c115/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java index a80bb9b..8fe59d4 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java @@ -53,6 +53,11 @@ public class EvictingPriorityBlockingQueue<E> { currentSize++; return null; } else { + if (isEmpty()) { + // Empty queue. But no capacity available, due to waitQueueSize and additionalElementsAllowed + // Return the element. + return e; + } // No capacity. Check if an element needs to be evicted. E last = deque.peekLast(); if (comparator.compare(e, last) < 0) { http://git-wip-us.apache.org/repos/asf/hive/blob/5814c115/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 9eaa7d7..4f2e325 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -277,7 +277,8 @@ public class TaskExecutorService extends AbstractService } // If the task cannot finish and if no slots are available then don't schedule it. // Also don't wait if we have a task and we just killed something to schedule it. - boolean shouldWait = numSlotsAvailable.get() == 0 && lastKillTimeMs == null; + // (numSlotsAvailable can go negative, if the callback after the thread completes is delayed) + boolean shouldWait = numSlotsAvailable.get() <= 0 && lastKillTimeMs == null; if (task.getTaskRunnerCallable().canFinish()) { if (isDebugEnabled) { LOG.debug("Attempting to schedule task {}, canFinish={}. Current state: " @@ -728,8 +729,8 @@ public class TaskExecutorService extends AbstractService knownTasks.remove(taskWrapper.getRequestId()); taskWrapper.setIsInPreemptableQueue(false); taskWrapper.maybeUnregisterForFinishedStateNotifications(); - taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result); updatePreemptionListAndNotify(result.getEndReason()); + taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result); } @Override @@ -742,8 +743,8 @@ public class TaskExecutorService extends AbstractService knownTasks.remove(taskWrapper.getRequestId()); taskWrapper.setIsInPreemptableQueue(false); taskWrapper.maybeUnregisterForFinishedStateNotifications(); - taskWrapper.getTaskRunnerCallable().getCallback().onFailure(t); updatePreemptionListAndNotify(null); + taskWrapper.getTaskRunnerCallable().getCallback().onFailure(t); LOG.error("Failed notification received: Stacktrace: " + ExceptionUtils.getStackTrace(t)); } http://git-wip-us.apache.org/repos/asf/hive/blob/5814c115/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index c077d75..1669815 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -407,6 +407,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { taskReporter.shutdown(); } if (umbilical != null) { + // TODO: Can this be moved out of the main callback path RPC.stopProxy(umbilical); } }
