Repository: hive
Updated Branches:
  refs/heads/branch-2.1 f3a05bd1e -> cb2330d94


HIVE-13599. LLAP: Incorrect handling of the preemption queue on finishable 
state updates. (Siddharth Seth, reviewed by Prasanth Jayachandran)
(cherry picked from commit 5776025c0a18f15f28dfccee24c08a6e951f8e2a)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cb2330d9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cb2330d9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cb2330d9

Branch: refs/heads/branch-2.1
Commit: cb2330d94583d459c8041bfa257df3ab3e15daec
Parents: f3a05bd
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Jun 6 20:36:03 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Jun 6 20:37:35 2016 -0700

----------------------------------------------------------------------
 .../llap/daemon/impl/TaskExecutorService.java   |  80 +++++++++----
 .../daemon/impl/TestTaskExecutorService.java    | 116 ++++++++++++++++++-
 2 files changed, 167 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cb2330d9/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 1e302e8..7744611 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -94,7 +94,9 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
   private final ListeningExecutorService waitQueueExecutorService;
   // Thread pool for callbacks on completion of execution of a work unit.
   private final ListeningExecutorService executionCompletionExecutorService;
-  private final BlockingQueue<TaskWrapper> preemptionQueue;
+
+  @VisibleForTesting
+  final BlockingQueue<TaskWrapper> preemptionQueue;
   private final boolean enablePreemption;
   private final ThreadPoolExecutor threadPoolExecutor;
   private final AtomicInteger numSlotsAvailable;
@@ -183,6 +185,7 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
 
   @Override
   public Set<String> getExecutorsStatus() {
+    // TODO Change this method to make the output easier to parse (parse 
programmatically)
     Set<String> result = new HashSet<>();
     StringBuilder value = new StringBuilder();
     for (Map.Entry<String, TaskWrapper> e : knownTasks.entrySet()) {
@@ -449,11 +452,7 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
           if (isDebugEnabled) {
             LOG.debug("Removing {} from preemptionQueue", fragmentId);
           }
-          taskWrapper.setIsInPreemptableQueue(false);
-          preemptionQueue.remove(taskWrapper);
-          if (metrics != null) {
-            metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
-          }
+          removeFromPreemptionQueue(taskWrapper);
         }
         taskWrapper.getTaskRunnerCallable().killTask();
       } else {
@@ -463,7 +462,8 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
     }
   }
 
-  private void trySchedule(final TaskWrapper taskWrapper) throws 
RejectedExecutionException {
+  @VisibleForTesting
+  void trySchedule(final TaskWrapper taskWrapper) throws 
RejectedExecutionException {
 
       synchronized (lock) {
         boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
@@ -508,7 +508,7 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
         LOG.debug("Preemption Queue: " + preemptionQueue);
       }
 
-      TaskWrapper pRequest = removeAndGetFromPreemptionQueue();
+      TaskWrapper pRequest = removeAndGetNextFromPreemptionQueue();
 
       // Avoid preempting tasks which are finishable - callback still to be 
processed.
       if (pRequest != null) {
@@ -548,18 +548,12 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
       if (newFinishableState == true && taskWrapper.isInPreemptionQueue()) {
         LOG.debug("Removing {} from preemption queue because it's state 
changed to {}",
             taskWrapper.getRequestId(), newFinishableState);
-        preemptionQueue.remove(taskWrapper.getTaskRunnerCallable());
-        if (metrics != null) {
-          metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
-        }
+        removeFromPreemptionQueue(taskWrapper);
       } else if (newFinishableState == false && 
!taskWrapper.isInPreemptionQueue() &&
           !taskWrapper.isInWaitQueue()) {
         LOG.debug("Adding {} to preemption queue since finishable state 
changed to {}",
             taskWrapper.getRequestId(), newFinishableState);
-        preemptionQueue.offer(taskWrapper);
-        if (metrics != null) {
-          metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
-        }
+        addToPreemptionQueue(taskWrapper);
       }
       lock.notify();
     }
@@ -567,7 +561,12 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
 
   private void addToPreemptionQueue(TaskWrapper taskWrapper) {
     synchronized (lock) {
-      preemptionQueue.add(taskWrapper);
+      boolean added = preemptionQueue.offer(taskWrapper);
+      if (!added) {
+        LOG.warn("Failed to add element {} to preemption queue. Terminating", 
taskWrapper);
+        
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),
+            new IllegalStateException("Preemption queue full. Cannot 
proceed"));
+      }
       taskWrapper.setIsInPreemptableQueue(true);
       if (metrics != null) {
         metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
@@ -575,10 +574,30 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
     }
   }
 
-  private TaskWrapper removeAndGetFromPreemptionQueue() {
+  /**
+   * Remove the specified taskWrapper from the preemption queue
+   * @param taskWrapper the taskWrapper to be removed
+   * @return true if the element existed in the queue and wasa removed, false 
otherwise
+   */
+  private boolean removeFromPreemptionQueue(TaskWrapper taskWrapper) {
+    synchronized (lock) {
+      return removeFromPreemptionQueueUnlocked(taskWrapper);
+    }
+  }
+
+  private boolean removeFromPreemptionQueueUnlocked(TaskWrapper taskWrapper) {
+    boolean removed = preemptionQueue.remove(taskWrapper);
+    taskWrapper.setIsInPreemptableQueue(false);
+    if (metrics != null) {
+      metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
+    }
+    return removed;
+  }
+
+  private TaskWrapper removeAndGetNextFromPreemptionQueue() {
     TaskWrapper taskWrapper;
     synchronized (lock) {
-       taskWrapper = preemptionQueue.remove();
+       taskWrapper = preemptionQueue.poll();
       if (taskWrapper != null) {
         taskWrapper.setIsInPreemptableQueue(false);
         if (metrics != null) {
@@ -603,6 +622,24 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
       this.taskWrapper = taskWrapper;
     }
 
+    // By the time either success / failed are called, the task itself knows 
that it has terminated,
+    // and will ignore subsequent kill requests if they go out.
+
+    // There's a race between removing the current task from the preemption 
queue and the actual scheduler
+    // attempting to take an element from the preemption queue to make space 
for another task.
+    // If the current element is removed to make space - that is OK, since the 
current task is completing and
+    // will end up making space for execution. Any kill message sent out by 
the scheduler to the task will
+    // be ignored, since the task knows it has completed (otherwise it would 
not be in this callback).
+    //
+    // If the task is removed from the queue as a result of this callback, and 
the scheduler happens to
+    // be in the section where it's looking for a preemptible task - the 
scheuler may end up pulling the
+    // next pre-emptible task and killing it (an extra preemption).
+    // TODO: This potential extra preemption can be avoided by synchronizing 
the entire tryScheduling block.\
+    // This would essentially synchronize all operations - it would be better 
to see if there's an
+    // approach where multiple locks could be used to avoid single threaded 
operation.
+    // - It checks available and preempts (which could be this task)
+    // - Or this task completes making space, and removing the need for 
preemption
+
     @Override
     public void onSuccess(TaskRunner2Result result) {
       knownTasks.remove(taskWrapper.getRequestId());
@@ -626,15 +663,12 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
       // if this task was added to pre-emption list, remove it
       if (enablePreemption) {
         String state = reason == null ? "FAILED" : reason.name();
-        boolean removed = preemptionQueue.remove(taskWrapper);
+        boolean removed = removeFromPreemptionQueueUnlocked(taskWrapper);
         if (removed && isInfoEnabled) {
           TaskRunnerCallable trc = taskWrapper.getTaskRunnerCallable();
           LOG.info(TaskRunnerCallable.getTaskIdentifierString(trc.getRequest(),
               trc.getVertexSpec()) + " request " + state + "! Removed from 
preemption list.");
         }
-        if (metrics != null) {
-          metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
-        }
       }
 
       numSlotsAvailable.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/hive/blob/cb2330d9/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index 506f611..ac4e5f1 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -21,6 +21,9 @@ import static 
org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.cr
 import static 
org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createSubmitWorkRequestProto;
 import static 
org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.HashMap;
@@ -29,6 +32,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -76,9 +80,9 @@ public class TestTaskExecutorService {
 
     try {
       taskExecutorService.schedule(r1);
-      r1.awaitStart();
+      awaitStartAndSchedulerRun(r1, taskExecutorService);
       taskExecutorService.schedule(r2);
-      r2.awaitStart();
+      awaitStartAndSchedulerRun(r2, taskExecutorService);
       // Verify r1 was preempted. Also verify that it finished (single 
executor), otherwise
       // r2 could have run anyway.
       r1.awaitEnd();
@@ -104,6 +108,73 @@ public class TestTaskExecutorService {
   }
 
   @Test(timeout = 10000)
+  public void testPreemptionStateOnTaskMoveToFinishableState() throws 
InterruptedException {
+
+    MockRequest r1 = createMockRequest(1, 1, 100, false, 20000l);
+
+    TaskExecutorServiceForTest taskExecutorService =
+        new TaskExecutorServiceForTest(1, 2, 
ShortestJobFirstComparator.class.getName(), true);
+    taskExecutorService.init(new Configuration());
+    taskExecutorService.start();
+
+    try {
+      Scheduler.SubmissionState submissionState = 
taskExecutorService.schedule(r1);
+      assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
+      awaitStartAndSchedulerRun(r1, taskExecutorService);
+
+      TaskWrapper taskWrapper = taskExecutorService.preemptionQueue.peek();
+      assertNotNull(taskWrapper);
+      assertTrue(taskWrapper.isInPreemptionQueue());
+
+      // Now notify the executorService that the task has moved to finishable 
state.
+      taskWrapper.finishableStateUpdated(true);
+      TaskWrapper taskWrapper2 = taskExecutorService.preemptionQueue.peek();
+      assertNull(taskWrapper2);
+      assertFalse(taskWrapper.isInPreemptionQueue());
+
+      r1.complete();
+      r1.awaitEnd();
+    } finally {
+      taskExecutorService.shutDown(false);
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testPreemptionStateOnTaskMoveToNonFinishableState() throws 
InterruptedException {
+
+    MockRequest r1 = createMockRequest(1, 1, 100, true, 20000l);
+
+    TaskExecutorServiceForTest taskExecutorService =
+        new TaskExecutorServiceForTest(1, 2, 
ShortestJobFirstComparator.class.getName(), true);
+    taskExecutorService.init(new Configuration());
+    taskExecutorService.start();
+
+    try {
+      Scheduler.SubmissionState submissionState = 
taskExecutorService.schedule(r1);
+      assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
+      awaitStartAndSchedulerRun(r1, taskExecutorService);
+
+      TaskWrapper taskWrapper = taskExecutorService.preemptionQueue.peek();
+      assertNull(taskWrapper);
+      assertEquals(1, taskExecutorService.knownTasks.size());
+      taskWrapper = 
taskExecutorService.knownTasks.entrySet().iterator().next().getValue();
+      assertFalse(taskWrapper.isInPreemptionQueue());
+
+      // Now notify the executorService that the task has moved to finishable 
state.
+      taskWrapper.finishableStateUpdated(false);
+      TaskWrapper taskWrapper2 = taskExecutorService.preemptionQueue.peek();
+      assertNotNull(taskWrapper2);
+      assertTrue(taskWrapper2.isInPreemptionQueue());
+      assertEquals(taskWrapper, taskWrapper2);
+
+      r1.complete();
+      r1.awaitEnd();
+    } finally {
+      taskExecutorService.shutDown(false);
+    }
+  }
+
+  @Test(timeout = 10000)
   public void testWaitQueuePreemption() throws InterruptedException {
     MockRequest r1 = createMockRequest(1, 1, 100, true, 20000l);
     MockRequest r2 = createMockRequest(2, 1, 200, false, 20000l);
@@ -121,7 +192,7 @@ public class TestTaskExecutorService {
 
       // TODO HIVE-11687. Remove the awaitStart once offer can handle 
(waitQueueSize + numFreeExecutionSlots)
       // This currently serves to allow the task to be removed from the 
waitQueue.
-      r1.awaitStart();
+      awaitStartAndSchedulerRun(r1, taskExecutorService);
       Scheduler.SubmissionState submissionState = 
taskExecutorService.schedule(r2);
       assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
 
@@ -155,7 +226,7 @@ public class TestTaskExecutorService {
       
assertTrue(taskExecutorService.knownTasks.containsKey(r2.getRequestId()));
       
assertTrue(taskExecutorService.knownTasks.containsKey(r5.getRequestId()));
 
-      r5.awaitStart();
+      awaitStartAndSchedulerRun(r5, taskExecutorService);
       TaskExecutorServiceForTest.InternalCompletionListenerForTest icl5 =
           
taskExecutorService.getInternalCompletionListenerForTest(r5.getRequestId());
       r5.complete();
@@ -166,7 +237,7 @@ public class TestTaskExecutorService {
       assertEquals(1, taskExecutorService.knownTasks.size());
       
assertTrue(taskExecutorService.knownTasks.containsKey(r2.getRequestId()));
 
-      r2.awaitStart();
+      awaitStartAndSchedulerRun(r2, taskExecutorService);
       TaskExecutorServiceForTest.InternalCompletionListenerForTest icl2 =
           
taskExecutorService.getInternalCompletionListenerForTest(r2.getRequestId());
       r2.complete();
@@ -180,13 +251,22 @@ public class TestTaskExecutorService {
   }
 
 
-
+  private void awaitStartAndSchedulerRun(MockRequest mockRequest,
+                                         TaskExecutorServiceForTest 
taskExecutorServiceForTest) throws
+      InterruptedException {
+    mockRequest.awaitStart();
+    taskExecutorServiceForTest.awaitTryScheduleIfInProgress();
+  }
 
   private static class TaskExecutorServiceForTest extends TaskExecutorService {
 
     private final Lock iclCreationLock = new ReentrantLock();
     private final Map<String, Condition> iclCreationConditions = new 
HashMap<>();
 
+    private final Lock tryScheduleLock = new ReentrantLock();
+    private final Condition tryScheduleCondition = 
tryScheduleLock.newCondition();
+    private boolean isInTrySchedule = false;
+
     public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, 
String waitQueueComparatorClassName,
                                       boolean enablePreemption) {
       super(numExecutors, waitQueueSize, waitQueueComparatorClassName, 
enablePreemption,
@@ -196,6 +276,30 @@ public class TestTaskExecutorService {
     private ConcurrentMap<String, InternalCompletionListenerForTest> 
completionListeners = new ConcurrentHashMap<>();
 
     @Override
+    void trySchedule(final TaskWrapper taskWrapper) throws 
RejectedExecutionException {
+      tryScheduleLock.lock();
+      try {
+        isInTrySchedule = true;
+        super.trySchedule(taskWrapper);
+        isInTrySchedule = false;
+        tryScheduleCondition.signal();
+      } finally {
+        tryScheduleLock.unlock();
+      }
+    }
+
+    private void awaitTryScheduleIfInProgress() throws InterruptedException {
+      tryScheduleLock.lock();
+      try {
+        while (isInTrySchedule) {
+          tryScheduleCondition.await();
+        }
+      } finally {
+        tryScheduleLock.unlock();
+      }
+    }
+
+    @Override
     InternalCompletionListener createInternalCompletionListener(TaskWrapper 
taskWrapper) {
       iclCreationLock.lock();
       try {

Reply via email to