This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new db86f80  HIVE-21981: When LlapDaemon capacity is set to 0 and the 
waitqueue is not empty then the queries are stuck (Peter Vary reviewed by Adam 
Szita)
db86f80 is described below

commit db86f807faf97c74ad8a020ac34848afc2eb4741
Author: Peter Vary <[email protected]>
AuthorDate: Fri Jul 12 14:26:24 2019 +0200

    HIVE-21981: When LlapDaemon capacity is set to 0 and the waitqueue is not 
empty then the queries are stuck (Peter Vary reviewed by Adam Szita)
---
 .../hive/llap/daemon/impl/TaskExecutorService.java | 14 ++++++++
 .../llap/daemon/impl/TestTaskExecutorService.java  | 40 ++++++++++++++++++++++
 2 files changed, 54 insertions(+)

diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 2469894..93b59dc 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -222,6 +222,20 @@ public class TaskExecutorService extends AbstractService
     waitQueue.setWaitQueueSize(newWaitQueueSize);
     metrics.setNumExecutors(newNumExecutors);
     metrics.setWaitQueueSize(newWaitQueueSize);
+    // If there is no executor left so the queued tasks can not be finished 
anyway, kill them all.
+    if (newNumExecutors == 0) {
+      synchronized (lock) {
+        TaskWrapper task = waitQueue.peek();
+        while (task != null) {
+          LOG.info("Killing task [" + task + "], since no executor left.");
+          task.getTaskRunnerCallable().killTask();
+          if (waitQueue.remove(task)) {
+            metrics.setExecutorNumQueuedRequests(waitQueue.size());
+          }
+          task = waitQueue.peek();
+        }
+      }
+    }
     LOG.info("TaskExecutorService is setting capacity to: numExecutors=" + 
newNumExecutors
         + ", waitQueueSize=" + newWaitQueueSize);
   }
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index 948a678..ce9fce9 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -531,6 +531,46 @@ public class TestTaskExecutorService {
     }
   }
 
+  @Test(timeout = 1000)
+  public void testZeroCapacity() throws InterruptedException {
+    TaskExecutorServiceForTest taskExecutorService =
+        new TaskExecutorServiceForTest(1, 1, 
ShortestJobFirstComparator.class.getName(), true, mockMetrics);
+
+    // Fourth is lower priority as a result of canFinish being set to false.
+    MockRequest r1 = createMockRequest(1, 1, 1, 100, 200, true, 20000L, true);
+    MockRequest r2 = createMockRequest(2, 1, 2, 100, 200, true, 20000L, true);
+
+    taskExecutorService.init(new Configuration());
+    taskExecutorService.start();
+
+    try {
+      Scheduler.SubmissionState submissionState;
+      // Schedule the first 2 tasks (1 to execute, 1 to the queue)
+      submissionState = taskExecutorService.schedule(r1);
+      assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
+
+      submissionState = taskExecutorService.schedule(r2);
+      assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
+
+      awaitStartAndSchedulerRun(r1, taskExecutorService);
+
+      taskExecutorService.setCapacity(0, 0);
+
+      // The queued task should be killed
+      assertTrue(r2.wasPreempted());
+
+      // The already running should be able to finish
+      assertFalse(r1.wasPreempted());
+      r1.complete();
+      r1.awaitEnd();
+      TaskExecutorServiceForTest.InternalCompletionListenerForTest icl =
+          
taskExecutorService.getInternalCompletionListenerForTest(r1.getRequestId());
+      icl.awaitCompletion();
+    } finally {
+      taskExecutorService.shutDown(false);
+    }
+  }
+
   @Test(timeout = 1000, expected = IllegalArgumentException.class)
   public void testSetCapacityHighExecutors() {
     TaskExecutorServiceForTest taskExecutorService =

Reply via email to