HIVE-11263. LLAP: TaskExecutorService state is not cleaned up. (Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/af7bf575
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/af7bf575
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/af7bf575

Branch: refs/heads/llap
Commit: af7bf57541cb70f203b5006458c3d9bd0d84a3af
Parents: f47810c
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Jul 17 08:46:54 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Jul 17 08:46:54 2015 -0700

----------------------------------------------------------------------
 .../llap/daemon/impl/TaskExecutorService.java   | 41 ++++++----
 .../daemon/impl/TestTaskExecutorService.java    | 86 +++++++++++++++++++-
 2 files changed, 110 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/af7bf575/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index e6cf151..5099a5c 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -295,8 +295,32 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
   @Override
   public void schedule(TaskRunnerCallable task) throws 
RejectedExecutionException {
     TaskWrapper taskWrapper = new TaskWrapper(task, this);
-    knownTasks.put(taskWrapper.getRequestId(), taskWrapper);
 
+    TaskWrapper evictedTask;
+    synchronized (lock) {
+      // If the queue does not have capacity, it does not throw a Rejection. 
Instead it will
+      // return the task with the lowest priority, which could be the task 
which is currently being processed.
+      evictedTask = waitQueue.offer(taskWrapper);
+      if (evictedTask != taskWrapper) {
+        knownTasks.put(taskWrapper.getRequestId(), taskWrapper);
+        taskWrapper.setIsInWaitQueue(true);
+        if (isInfoEnabled) {
+          LOG.info("{} added to wait queue. Current wait queue size={}", 
task.getRequestId(),
+              waitQueue.size());
+        }
+      } else {
+        if (isInfoEnabled) {
+          LOG.info("wait queue full, size={}. {} not added", waitQueue.size(), 
task.getRequestId());
+        }
+        evictedTask.getTaskRunnerCallable().killTask();
+        throw new RejectedExecutionException("Wait queue full");
+      }
+    }
+
+    // At this point, the task has been added into the queue. It may have 
caused an eviction for
+    // some other task.
+
+    // This registration has to be done after knownTasks has been populated.
     // Register for state change notifications so that the waitQueue can be 
re-ordered correctly
     // if the fragment moves in or out of the finishable state.
     boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
@@ -304,24 +328,11 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
     // and registrations are mutually exclusive.
     taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish);
 
-    TaskWrapper evictedTask;
-    try {
-      synchronized (lock) {
-        evictedTask = waitQueue.offer(taskWrapper);
-        taskWrapper.setIsInWaitQueue(true);
-      }
-    } catch (RejectedExecutionException e) {
-      knownTasks.remove(taskWrapper.getRequestId());
-      taskWrapper.maybeUnregisterForFinishedStateNotifications();
-      throw e;
-    }
-    if (isInfoEnabled) {
-      LOG.info("{} added to wait queue. Current wait queue size={}", 
task.getRequestId(), waitQueue.size());
-    }
     if (isDebugEnabled) {
       LOG.debug("Wait Queue: {}", waitQueue);
     }
     if (evictedTask != null) {
+      knownTasks.remove(evictedTask.getRequestId());
       evictedTask.maybeUnregisterForFinishedStateNotifications();
       evictedTask.setIsInWaitQueue(false);
       evictedTask.getTaskRunnerCallable().killTask();

http://git-wip-us.apache.org/repos/asf/hive/blob/af7bf575/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index eff4abe..25f7a81 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -20,12 +20,14 @@ package org.apache.hadoop.hive.llap.daemon.impl;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
@@ -42,7 +44,6 @@ import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentS
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -259,6 +260,87 @@ public class TestTaskExecutorService {
     }
   }
 
+  @Test(timeout = 10000)
+  public void testWaitQueuePreemption() throws InterruptedException {
+    MockRequest r1 = createMockRequest(1, 1, 100, true, 20000l);
+    MockRequest r2 = createMockRequest(2, 1, 200, false, 20000l);
+    MockRequest r3 = createMockRequest(3, 1, 300, false, 20000l);
+    MockRequest r4 = createMockRequest(4, 1, 400, false, 20000l);
+    MockRequest r5 = createMockRequest(5, 1, 500, true, 20000l);
+
+    TaskExecutorServiceForTest taskExecutorService =
+        new TaskExecutorServiceForTest(1, 2, false, true);
+    taskExecutorService.init(conf);
+    taskExecutorService.start();
+
+    try {
+      taskExecutorService.schedule(r1);
+      r1.awaitStart();
+      try {
+        taskExecutorService.schedule(r2);
+      } catch (RejectedExecutionException e) {
+        fail("Unexpected rejection with space available in queue");
+      }
+      try {
+        taskExecutorService.schedule(r3);
+      } catch (RejectedExecutionException e) {
+        fail("Unexpected rejection with space available in queue");
+      }
+
+      try {
+        taskExecutorService.schedule(r4);
+        fail("Expecting a Rejection for non finishable task with a full 
queue");
+      } catch (RejectedExecutionException e) {
+      }
+
+      try {
+        taskExecutorService.schedule(r5);
+      } catch (RejectedExecutionException e) {
+        fail("Unexpected rejection for a finishable task");
+      }
+
+      // Ensure the correct task was preempted.
+      assertEquals(true, r3.wasPreempted());
+
+      TaskExecutorServiceForTest.InternalCompletionListenerForTest icl1 =
+          
taskExecutorService.getInternalCompletionListenerForTest(r1.getRequestId());
+
+      // Currently 3 known tasks. 1, 2, 5
+      assertEquals(3, taskExecutorService.knownTasks.size());
+      
assertTrue(taskExecutorService.knownTasks.containsKey(r1.getRequestId()));
+      
assertTrue(taskExecutorService.knownTasks.containsKey(r2.getRequestId()));
+      
assertTrue(taskExecutorService.knownTasks.containsKey(r5.getRequestId()));
+
+      r1.complete();
+      icl1.awaitCompletion();
+
+      // Two known tasks left. r2 and r5. (r1 complete, r3 evicted, r4 
rejected)
+      assertEquals(2, taskExecutorService.knownTasks.size());
+      
assertTrue(taskExecutorService.knownTasks.containsKey(r2.getRequestId()));
+      
assertTrue(taskExecutorService.knownTasks.containsKey(r5.getRequestId()));
+
+      r5.awaitStart();
+      TaskExecutorServiceForTest.InternalCompletionListenerForTest icl5 =
+          
taskExecutorService.getInternalCompletionListenerForTest(r5.getRequestId());
+      r5.complete();
+      icl5.awaitCompletion();
+
+      // 1 Pending task which is not finishable
+      assertEquals(1, taskExecutorService.knownTasks.size());
+      
assertTrue(taskExecutorService.knownTasks.containsKey(r2.getRequestId()));
+
+      r2.awaitStart();
+      TaskExecutorServiceForTest.InternalCompletionListenerForTest icl2 =
+          
taskExecutorService.getInternalCompletionListenerForTest(r2.getRequestId());
+      r2.complete();
+      icl2.awaitCompletion();
+      // 0 Pending task which is not finishable
+      assertEquals(0, taskExecutorService.knownTasks.size());
+    } finally {
+      taskExecutorService.shutDown(false);
+    }
+  }
+
 
   // ----------- Helper classes and methods go after this point. Tests above 
this -----------
 
@@ -446,6 +528,7 @@ public class TestTaskExecutorService {
 
     private ConcurrentMap<String, InternalCompletionListenerForTest> 
completionListeners = new ConcurrentHashMap<>();
 
+    @Override
     InternalCompletionListener createInternalCompletionListener(TaskWrapper 
taskWrapper) {
       InternalCompletionListenerForTest icl = new 
InternalCompletionListenerForTest(taskWrapper);
       completionListeners.put(taskWrapper.getRequestId(), icl);
@@ -456,7 +539,6 @@ public class TestTaskExecutorService {
       return completionListeners.get(requestId);
     }
 
-
     private class InternalCompletionListenerForTest extends 
TaskExecutorService.InternalCompletionListener {
 
       private final Lock lock = new ReentrantLock();

Reply via email to