This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new db86f80 HIVE-21981: When LlapDaemon capacity is set to 0 and the
waitqueue is not empty then the queries are stuck (Peter Vary reviewed by Adam
Szita)
db86f80 is described below
commit db86f807faf97c74ad8a020ac34848afc2eb4741
Author: Peter Vary <[email protected]>
AuthorDate: Fri Jul 12 14:26:24 2019 +0200
HIVE-21981: When LlapDaemon capacity is set to 0 and the waitqueue is not
empty then the queries are stuck (Peter Vary reviewed by Adam Szita)
---
.../hive/llap/daemon/impl/TaskExecutorService.java | 14 ++++++++
.../llap/daemon/impl/TestTaskExecutorService.java | 40 ++++++++++++++++++++++
2 files changed, 54 insertions(+)
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 2469894..93b59dc 100644
---
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -222,6 +222,20 @@ public class TaskExecutorService extends AbstractService
waitQueue.setWaitQueueSize(newWaitQueueSize);
metrics.setNumExecutors(newNumExecutors);
metrics.setWaitQueueSize(newWaitQueueSize);
+ // If there is no executor left so the queued tasks can not be finished
anyway, kill them all.
+ if (newNumExecutors == 0) {
+ synchronized (lock) {
+ TaskWrapper task = waitQueue.peek();
+ while (task != null) {
+ LOG.info("Killing task [" + task + "], since no executor left.");
+ task.getTaskRunnerCallable().killTask();
+ if (waitQueue.remove(task)) {
+ metrics.setExecutorNumQueuedRequests(waitQueue.size());
+ }
+ task = waitQueue.peek();
+ }
+ }
+ }
LOG.info("TaskExecutorService is setting capacity to: numExecutors=" +
newNumExecutors
+ ", waitQueueSize=" + newWaitQueueSize);
}
diff --git
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index 948a678..ce9fce9 100644
---
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -531,6 +531,46 @@ public class TestTaskExecutorService {
}
}
+ @Test(timeout = 1000)
+ public void testZeroCapacity() throws InterruptedException {
+ TaskExecutorServiceForTest taskExecutorService =
+ new TaskExecutorServiceForTest(1, 1,
ShortestJobFirstComparator.class.getName(), true, mockMetrics);
+
+ // Fourth is lower priority as a result of canFinish being set to false.
+ MockRequest r1 = createMockRequest(1, 1, 1, 100, 200, true, 20000L, true);
+ MockRequest r2 = createMockRequest(2, 1, 2, 100, 200, true, 20000L, true);
+
+ taskExecutorService.init(new Configuration());
+ taskExecutorService.start();
+
+ try {
+ Scheduler.SubmissionState submissionState;
+ // Schedule the first 2 tasks (1 to execute, 1 to the queue)
+ submissionState = taskExecutorService.schedule(r1);
+ assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
+
+ submissionState = taskExecutorService.schedule(r2);
+ assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
+
+ awaitStartAndSchedulerRun(r1, taskExecutorService);
+
+ taskExecutorService.setCapacity(0, 0);
+
+ // The queued task should be killed
+ assertTrue(r2.wasPreempted());
+
+ // The already running should be able to finish
+ assertFalse(r1.wasPreempted());
+ r1.complete();
+ r1.awaitEnd();
+ TaskExecutorServiceForTest.InternalCompletionListenerForTest icl =
+
taskExecutorService.getInternalCompletionListenerForTest(r1.getRequestId());
+ icl.awaitCompletion();
+ } finally {
+ taskExecutorService.shutDown(false);
+ }
+ }
+
@Test(timeout = 1000, expected = IllegalArgumentException.class)
public void testSetCapacityHighExecutors() {
TaskExecutorServiceForTest taskExecutorService =