Repository: hive Updated Branches: refs/heads/master 95796e172 -> 82d346865
HIVE-16104 : LLAP: preemption may be too aggressive if the pre-empted task doesn't die immediately (Sergey Shelukhin, reviewed by Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/82d34686 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/82d34686 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/82d34686 Branch: refs/heads/master Commit: 82d346865860c7aacc174b9555d4126960d3f0b1 Parents: 95796e1 Author: Sergey Shelukhin <[email protected]> Authored: Mon Mar 13 17:41:27 2017 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Mon Mar 13 17:48:11 2017 -0700 ---------------------------------------------------------------------- llap-server/pom.xml | 6 + .../llap/daemon/impl/ContainerRunnerImpl.java | 2 +- .../llap/daemon/impl/TaskExecutorService.java | 179 ++++++++++--------- .../llap/daemon/impl/TaskRunnerCallable.java | 2 +- .../daemon/impl/TaskExecutorTestHelpers.java | 39 +++- .../daemon/impl/TestTaskExecutorService.java | 98 ++++++++-- 6 files changed, 224 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/82d34686/llap-server/pom.xml ---------------------------------------------------------------------- diff --git a/llap-server/pom.xml b/llap-server/pom.xml index fc392fb..630e243 100644 --- a/llap-server/pom.xml +++ b/llap-server/pom.xml @@ -253,6 +253,12 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-llap-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> http://git-wip-us.apache.org/repos/asf/hive/blob/82d34686/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 2a69d6a..82bbcf3 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -136,7 +136,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu String waitQueueSchedulerClassName = HiveConf.getVar( conf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME); this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, - waitQueueSchedulerClassName, enablePreemption, classLoader, metrics); + waitQueueSchedulerClassName, enablePreemption, classLoader, metrics, null); completionListener = (SchedulerFragmentCompletingListener) executorService; addIfService(executorService); http://git-wip-us.apache.org/repos/asf/hive/blob/82d34686/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index c1f6c96..9eaa7d7 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -88,6 +88,9 @@ public class TaskExecutorService extends AbstractService private static final boolean isDebugEnabled = LOG.isDebugEnabled(); private static final String TASK_EXECUTOR_THREAD_NAME_FORMAT = "Task-Executor-%d"; private static final String WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT = "Wait-Queue-Scheduler-%d"; + private static final long PREEMPTION_KILL_GRACE_MS = 500; // 500ms + private static final int PREEMPTION_KILL_GRACE_SLEEP_MS = 50; // 50ms + private final AtomicBoolean isShutdown = new AtomicBoolean(false); @@ -106,7 +109,7 @@ public class TaskExecutorService extends AbstractService private final ThreadPoolExecutor threadPoolExecutor; private final AtomicInteger numSlotsAvailable; private final int maxParallelExecutors; - private final Clock clock = new MonotonicClock(); + private final Clock clock; // Tracks running fragments, and completing fragments. // Completing since we have a race in the AM being notified and the task actually @@ -123,7 +126,7 @@ public class TaskExecutorService extends AbstractService public TaskExecutorService(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName, boolean enablePreemption, - ClassLoader classLoader, final LlapDaemonExecutorMetrics metrics) { + ClassLoader classLoader, final LlapDaemonExecutorMetrics metrics, Clock clock) { super(TaskExecutorService.class.getSimpleName()); LOG.info("TaskExecutorService is being setup with parameters: " + "numExecutors=" + numExecutors @@ -135,6 +138,7 @@ public class TaskExecutorService extends AbstractService waitQueueComparatorClassName); this.maxParallelExecutors = numExecutors; this.waitQueue = new EvictingPriorityBlockingQueue<>(waitQueueComparator, waitQueueSize); + this.clock = clock == null ? new MonotonicClock() : clock; this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size numExecutors, // max pool size 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), // direct hand-off @@ -252,20 +256,18 @@ public class TaskExecutorService extends AbstractService * Worker that takes tasks from wait queue and schedule it for execution. */ private final class WaitQueueWorker implements Runnable { - TaskWrapper task; + private TaskWrapper task; @Override public void run() { - try { - - + Long lastKillTimeMs = null; while (!isShutdown.get()) { RejectedExecutionException rejectedException = null; synchronized (lock) { - // Since schedule() can be called from multiple threads, we peek the wait queue, - // try scheduling the task and then remove the task if scheduling is successful. - // This will make sure the task's place in the wait queue is held until it gets scheduled. + // Since schedule() can be called from multiple threads, we peek the wait queue, try + // scheduling the task and then remove the task if scheduling is successful. This + // will make sure the task's place in the wait queue is held until it gets scheduled. task = waitQueue.peek(); if (task == null) { if (!isShutdown.get()) { @@ -273,22 +275,17 @@ public class TaskExecutorService extends AbstractService } continue; } - // if the task cannot finish and if no slots are available then don't schedule it. - boolean shouldWait = false; + // If the task cannot finish and if no slots are available then don't schedule it. + // Also don't wait if we have a task and we just killed something to schedule it. + boolean shouldWait = numSlotsAvailable.get() == 0 && lastKillTimeMs == null; if (task.getTaskRunnerCallable().canFinish()) { if (isDebugEnabled) { - LOG.debug( - "Attempting to schedule task {}, canFinish={}. Current state: preemptionQueueSize={}, numSlotsAvailable={}, waitQueueSize={}", + LOG.debug("Attempting to schedule task {}, canFinish={}. Current state: " + + "preemptionQueueSize={}, numSlotsAvailable={}, waitQueueSize={}", task.getRequestId(), task.getTaskRunnerCallable().canFinish(), preemptionQueue.size(), numSlotsAvailable.get(), waitQueue.size()); } - if (numSlotsAvailable.get() == 0 && (enablePreemption == false || preemptionQueue.isEmpty())) { - shouldWait = true; - } - } else { - if (numSlotsAvailable.get() == 0) { - shouldWait = true; - } + shouldWait = shouldWait && (enablePreemption == false || preemptionQueue.isEmpty()); } if (shouldWait) { if (!isShutdown.get()) { @@ -299,36 +296,43 @@ public class TaskExecutorService extends AbstractService continue; } try { - trySchedule(task); - // wait queue could have been re-ordered in the mean time because of concurrent task + tryScheduleUnderLock(task); + // Wait queue could have been re-ordered in the mean time because of concurrent task // submission. So remove the specific task instead of the head task. if (waitQueue.remove(task)) { if (metrics != null) { metrics.setExecutorNumQueuedRequests(waitQueue.size()); } } + lastKillTimeMs = null; // We have filled the spot we may have killed for (if any). } catch (RejectedExecutionException e) { rejectedException = e; } - } + } // synchronized (lock) // Handle the rejection outside of the lock - if (rejectedException !=null) { - handleScheduleAttemptedRejection(task); - } - - synchronized (lock) { - while (waitQueue.isEmpty()) { - if (!isShutdown.get()) { - lock.wait(); + if (rejectedException != null) { + if (lastKillTimeMs != null + && (clock.getTime() - lastKillTimeMs) < PREEMPTION_KILL_GRACE_MS) { + // We killed something, but still got rejected. Wait a bit to give a chance to our + // previous victim to actually die. + synchronized (lock) { + lock.wait(PREEMPTION_KILL_GRACE_SLEEP_MS); + } + } else { + if (isDebugEnabled && lastKillTimeMs != null) { + LOG.debug("Grace period ended for the previous kill; preemtping more tasks"); + } + if (handleScheduleAttemptedRejection(task)) { + lastKillTimeMs = clock.getTime(); // We killed something. } } } } - } catch (InterruptedException e) { if (isShutdown.get()) { - LOG.info(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT + " thread has been interrupted after shutdown."); + LOG.info(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT + + " thread has been interrupted after shutdown."); } else { LOG.warn(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT + " interrupted without shutdown", e); throw new RuntimeException(e); @@ -459,7 +463,7 @@ public class TaskExecutorService extends AbstractService } } synchronized (lock) { - lock.notify(); + lock.notifyAll(); } if (metrics != null) { @@ -504,7 +508,7 @@ public class TaskExecutorService extends AbstractService } else { LOG.info("Ignoring killFragment request for {} since it isn't known", fragmentId); } - lock.notify(); + lock.notifyAll(); } } @@ -537,72 +541,73 @@ public class TaskExecutorService extends AbstractService } @VisibleForTesting - void trySchedule(final TaskWrapper taskWrapper) throws RejectedExecutionException { - - synchronized (lock) { - boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); - LOG.info("Attempting to execute {}", taskWrapper); - ListenableFuture<TaskRunner2Result> future = executorService.submit( - taskWrapper.getTaskRunnerCallable()); - runningFragmentCount.incrementAndGet(); - taskWrapper.setIsInWaitQueue(false); - FutureCallback<TaskRunner2Result> wrappedCallback = createInternalCompletionListener( - taskWrapper); - // Callback on a separate thread so that when a task completes, the thread in the main queue - // is actually available for execution and will not potentially result in a RejectedExecution - Futures.addCallback(future, wrappedCallback, executionCompletionExecutorService); - - if (isDebugEnabled) { - LOG.debug("{} scheduled for execution. canFinish={}", - taskWrapper.getRequestId(), canFinish); - } + /** Assumes the epic lock is already taken. */ + void tryScheduleUnderLock(final TaskWrapper taskWrapper) throws RejectedExecutionException { + if (isInfoEnabled) { + LOG.info("Attempting to execute {}", taskWrapper); + } + ListenableFuture<TaskRunner2Result> future = executorService.submit( + taskWrapper.getTaskRunnerCallable()); + runningFragmentCount.incrementAndGet(); + taskWrapper.setIsInWaitQueue(false); + FutureCallback<TaskRunner2Result> wrappedCallback = createInternalCompletionListener( + taskWrapper); + // Callback on a separate thread so that when a task completes, the thread in the main queue + // is actually available for execution and will not potentially result in a RejectedExecution + Futures.addCallback(future, wrappedCallback, executionCompletionExecutorService); + + boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); + if (isDebugEnabled) { + LOG.debug("{} scheduled for execution. canFinish={}", taskWrapper.getRequestId(), canFinish); + } - // only tasks that cannot finish immediately are pre-emptable. In other words, if all inputs - // to the tasks are not ready yet, the task is eligible for pre-emptable. - if (enablePreemption) { - if (!canFinish) { - if (isInfoEnabled) { - LOG.info("{} is not finishable. Adding it to pre-emption queue", - taskWrapper.getRequestId()); - } - addToPreemptionQueue(taskWrapper); - } + // only tasks that cannot finish immediately are pre-emptable. In other words, if all inputs + // to the tasks are not ready yet, the task is eligible for pre-emptable. + if (enablePreemption) { + if (!canFinish) { + if (isInfoEnabled) { + LOG.info("{} is not finishable. Adding it to pre-emption queue", + taskWrapper.getRequestId()); } + addToPreemptionQueue(taskWrapper); } - numSlotsAvailable.decrementAndGet(); - if (metrics != null) { - metrics.setNumExecutorsAvailable(numSlotsAvailable.get()); - } + } + numSlotsAvailable.decrementAndGet(); + if (metrics != null) { + metrics.setNumExecutorsAvailable(numSlotsAvailable.get()); + } } - private void handleScheduleAttemptedRejection(TaskWrapper taskWrapper) { + private boolean handleScheduleAttemptedRejection(TaskWrapper taskWrapper) { if (enablePreemption && taskWrapper.getTaskRunnerCallable().canFinish() && !preemptionQueue.isEmpty()) { - if (isDebugEnabled) { LOG.debug("Preemption Queue: " + preemptionQueue); } - TaskWrapper pRequest = removeAndGetNextFromPreemptionQueue(); - - // Avoid preempting tasks which are finishable - callback still to be processed. - if (pRequest != null) { + while (true) { // Try to preempt until we have something. + TaskWrapper pRequest = removeAndGetNextFromPreemptionQueue(); + if (pRequest == null) { + return false; // Woe us. + } if (pRequest.getTaskRunnerCallable().canFinish()) { - LOG.info( - "Removed {} from preemption queue, but not preempting since it's now finishable", + LOG.info("Removed {} from preemption queue, but not preempting since it's now finishable", pRequest.getRequestId()); - } else { - if (isInfoEnabled) { - LOG.info("Invoking kill task for {} due to pre-emption to run {}", - pRequest.getRequestId(), taskWrapper.getRequestId()); - } - // The task will either be killed or is already in the process of completing, which will - // trigger the next scheduling run, or result in available slots being higher than 0, - // which will cause the scheduler loop to continue. - pRequest.getTaskRunnerCallable().killTask(); + continue; // Try something else. } + if (isInfoEnabled) { + LOG.info("Invoking kill task for {} due to pre-emption to run {}", + pRequest.getRequestId(), taskWrapper.getRequestId()); + } + // The task will either be killed or is already in the process of completing, which will + // trigger the next scheduling run, or result in available slots being higher than 0, + // which will cause the scheduler loop to continue. + pRequest.getTaskRunnerCallable().killTask(); + // We've killed something and may want to wait for it to die. + return true; } } + return false; } private void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishableState) { @@ -628,7 +633,7 @@ public class TaskExecutorService extends AbstractService taskWrapper.getRequestId(), newFinishableState); addToPreemptionQueue(taskWrapper); } - lock.notify(); + lock.notifyAll(); } } @@ -765,7 +770,7 @@ public class TaskExecutorService extends AbstractService } synchronized (lock) { if (!waitQueue.isEmpty()) { - lock.notify(); + lock.notifyAll(); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/82d34686/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 18f0db9..c077d75 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -329,6 +329,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { synchronized (this) { TezTaskAttemptID ta = taskSpec.getTaskAttemptID(); LOG.info("Kill task requested for id={}, taskRunnerSetup={}", ta, taskRunner != null); + shouldRunTask = false; if (taskRunner != null) { killtimerWatch.start(); LOG.info("Issuing kill to task {}", taskSpec.getTaskAttemptID()); @@ -346,7 +347,6 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { LOG.info("Kill request for task {} did not complete because the task is already complete", ta); } - shouldRunTask = false; } else { // If the task hasn't started, and it is killed - report back to the AM that the task has been killed. LOG.debug("Reporting taskKilled for non-started fragment {}", getRequestId()); http://git-wip-us.apache.org/repos/asf/hive/blob/82d34686/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 259e383..6287ae8 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -178,6 +178,8 @@ public class TaskExecutorTestHelpers { private final Condition sleepCondition = lock.newCondition(); private boolean shouldSleep = true; private final Condition finishedCondition = lock.newCondition(); + private final Object killDelay = new Object(); + private boolean isOkToFinish = true; public MockRequest(SubmitWorkRequestProto requestProto, QueryFragmentInfo fragmentInfo, boolean canFinish, long workTime, TezEvent initialEvent) { @@ -207,17 +209,19 @@ public class TaskExecutorTestHelpers { lock.lock(); try { if (shouldSleep) { + logInfo(super.getRequestId() + " is sleeping for " + workTime, null); sleepCondition.await(workTime, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { wasInterrupted.set(true); - return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false); + return handleKill(); } finally { lock.unlock(); } if (wasKilled.get()) { - return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false); + return handleKill(); } else { + logInfo(super.getRequestId() + " succeeded", null); return new TaskRunner2Result(EndReason.SUCCESS, null, null, false); } } finally { @@ -231,6 +235,33 @@ public class TaskExecutorTestHelpers { } } + private TaskRunner2Result handleKill() { + boolean hasLogged = false; + while (true) { + synchronized (killDelay) { + if (isOkToFinish) break; + if (!hasLogged) { + logInfo("Waiting after the kill: " + getRequestId()); + hasLogged = true; + } + try { + killDelay.wait(100); + } catch (InterruptedException e) { + } + } + } + logInfo("Finished with the kill: " + getRequestId()); + return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false); + } + + public void unblockKill() { + synchronized (killDelay) { + logInfo("Unblocking the kill: " + getRequestId()); + isOkToFinish = true; + killDelay.notifyAll(); + } + } + @Override public void killTask() { lock.lock(); @@ -292,6 +323,10 @@ public class TaskExecutorTestHelpers { public boolean canFinish() { return canFinish; } + + public void setSleepAfterKill() { + isOkToFinish = false; + } } private static void logInfo(String message, Throwable t) { http://git-wip-us.apache.org/repos/asf/hive/blob/82d34686/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index bf7d1d8..df563f4 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -20,11 +20,15 @@ package org.apache.hadoop.hive.llap.daemon.impl; import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createMockRequest; import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createSubmitWorkRequestProto; import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; + +import org.apache.hadoop.yarn.util.SystemClock; + +import org.apache.hadoop.hive.llap.testhelpers.ControlledClock; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.yarn.util.Clock; import java.util.HashMap; import java.util.Map; @@ -303,6 +307,58 @@ public class TestTaskExecutorService { } } + @Test(timeout = 10000) + public void testDontKillMultiple() throws InterruptedException { + MockRequest victim1 = createMockRequest(1, 1, 100, 100, false, 20000l); + MockRequest victim2 = createMockRequest(2, 1, 100, 100, false, 20000l); + runPreemptionGraceTest(victim1, victim2, 200); + assertNotEquals(victim1.wasPreempted(), victim2.wasPreempted()); // One and only one. + } + + @Test(timeout = 10000) + public void testDoKillMultiple() throws InterruptedException { + MockRequest victim1 = createMockRequest(1, 1, 100, 100, false, 20000l); + MockRequest victim2 = createMockRequest(2, 1, 100, 100, false, 20000l); + runPreemptionGraceTest(victim1, victim2, 1000); + assertTrue(victim1.wasPreempted()); + assertTrue(victim2.wasPreempted()); + } + + private void runPreemptionGraceTest( + MockRequest victim1, MockRequest victim2, int time) throws InterruptedException { + MockRequest preemptor = createMockRequest(3, 1, 100, 100, true, 20000l); + victim1.setSleepAfterKill(); + victim2.setSleepAfterKill(); + + ControlledClock clock = new ControlledClock(new SystemClock()); + clock.setTime(0); + TaskExecutorServiceForTest taskExecutorService = new TaskExecutorServiceForTest( + 2, 3, ShortestJobFirstComparator.class.getName(), true, clock); + taskExecutorService.init(new Configuration()); + taskExecutorService.start(); + + try { + taskExecutorService.schedule(victim1); + awaitStartAndSchedulerRun(victim1, taskExecutorService); + taskExecutorService.schedule(victim2); + awaitStartAndSchedulerRun(victim2, taskExecutorService); + taskExecutorService.schedule(preemptor); + taskExecutorService.waitForScheduleRuns(5); // Wait for scheduling to run a few times. + clock.setTime(time); + taskExecutorService.waitForScheduleRuns(5); // Wait for scheduling to run a few times. + victim1.unblockKill(); + victim2.unblockKill(); + preemptor.complete(); + preemptor.awaitEnd(); + TaskExecutorServiceForTest.InternalCompletionListenerForTest icl3 = + taskExecutorService.getInternalCompletionListenerForTest(preemptor.getRequestId()); + icl3.awaitCompletion(); + } finally { + taskExecutorService.shutDown(false); + } + } + + private void awaitStartAndSchedulerRun(MockRequest mockRequest, TaskExecutorServiceForTest taskExecutorServiceForTest) throws @@ -319,23 +375,43 @@ public class TestTaskExecutorService { private final Lock tryScheduleLock = new ReentrantLock(); private final Condition tryScheduleCondition = tryScheduleLock.newCondition(); private boolean isInTrySchedule = false; + private int scheduleAttempts = 0; + + public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, + String waitQueueComparatorClassName, boolean enablePreemption) { + this(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption, null); + } - public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName, - boolean enablePreemption) { + public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, + String waitQueueComparatorClassName, boolean enablePreemption, Clock clock) { super(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption, - Thread.currentThread().getContextClassLoader(), null); + Thread.currentThread().getContextClassLoader(), null, clock); } - private ConcurrentMap<String, InternalCompletionListenerForTest> completionListeners = new ConcurrentHashMap<>(); + private ConcurrentMap<String, InternalCompletionListenerForTest> completionListeners = + new ConcurrentHashMap<>(); @Override - void trySchedule(final TaskWrapper taskWrapper) throws RejectedExecutionException { + void tryScheduleUnderLock(final TaskWrapper taskWrapper) throws RejectedExecutionException { tryScheduleLock.lock(); try { isInTrySchedule = true; - super.trySchedule(taskWrapper); + super.tryScheduleUnderLock(taskWrapper); + } finally { isInTrySchedule = false; + ++scheduleAttempts; tryScheduleCondition.signal(); + tryScheduleLock.unlock(); + } + } + + public void waitForScheduleRuns(int n) throws InterruptedException { + tryScheduleLock.lock(); + try { + int targetRuns = scheduleAttempts + n; + while (scheduleAttempts < targetRuns) { + tryScheduleCondition.await(100, TimeUnit.MILLISECONDS); + } } finally { tryScheduleLock.unlock(); }
