HIVE-11263. LLAP: TaskExecutorService state is not cleaned up. (Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/af7bf575 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/af7bf575 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/af7bf575 Branch: refs/heads/llap Commit: af7bf57541cb70f203b5006458c3d9bd0d84a3af Parents: f47810c Author: Siddharth Seth <ss...@apache.org> Authored: Fri Jul 17 08:46:54 2015 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Fri Jul 17 08:46:54 2015 -0700 ---------------------------------------------------------------------- .../llap/daemon/impl/TaskExecutorService.java | 41 ++++++---- .../daemon/impl/TestTaskExecutorService.java | 86 +++++++++++++++++++- 2 files changed, 110 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/af7bf575/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index e6cf151..5099a5c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -295,8 +295,32 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta @Override public void schedule(TaskRunnerCallable task) throws RejectedExecutionException { TaskWrapper taskWrapper = new TaskWrapper(task, this); - knownTasks.put(taskWrapper.getRequestId(), taskWrapper); + TaskWrapper evictedTask; + synchronized (lock) { + // If the queue does not have capacity, it does not throw a Rejection. Instead it will + // return the task with the lowest priority, which could be the task which is currently being processed. + evictedTask = waitQueue.offer(taskWrapper); + if (evictedTask != taskWrapper) { + knownTasks.put(taskWrapper.getRequestId(), taskWrapper); + taskWrapper.setIsInWaitQueue(true); + if (isInfoEnabled) { + LOG.info("{} added to wait queue. Current wait queue size={}", task.getRequestId(), + waitQueue.size()); + } + } else { + if (isInfoEnabled) { + LOG.info("wait queue full, size={}. {} not added", waitQueue.size(), task.getRequestId()); + } + evictedTask.getTaskRunnerCallable().killTask(); + throw new RejectedExecutionException("Wait queue full"); + } + } + + // At this point, the task has been added into the queue. It may have caused an eviction for + // some other task. + + // This registration has to be done after knownTasks has been populated. // Register for state change notifications so that the waitQueue can be re-ordered correctly // if the fragment moves in or out of the finishable state. boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); @@ -304,24 +328,11 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta // and registrations are mutually exclusive. taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish); - TaskWrapper evictedTask; - try { - synchronized (lock) { - evictedTask = waitQueue.offer(taskWrapper); - taskWrapper.setIsInWaitQueue(true); - } - } catch (RejectedExecutionException e) { - knownTasks.remove(taskWrapper.getRequestId()); - taskWrapper.maybeUnregisterForFinishedStateNotifications(); - throw e; - } - if (isInfoEnabled) { - LOG.info("{} added to wait queue. Current wait queue size={}", task.getRequestId(), waitQueue.size()); - } if (isDebugEnabled) { LOG.debug("Wait Queue: {}", waitQueue); } if (evictedTask != null) { + knownTasks.remove(evictedTask.getRequestId()); evictedTask.maybeUnregisterForFinishedStateNotifications(); evictedTask.setIsInWaitQueue(false); evictedTask.getTaskRunnerCallable().killTask(); http://git-wip-us.apache.org/repos/asf/hive/blob/af7bf575/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index eff4abe..25f7a81 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -20,12 +20,14 @@ package org.apache.hadoop.hive.llap.daemon.impl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; @@ -42,7 +44,6 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentS import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -259,6 +260,87 @@ public class TestTaskExecutorService { } } + @Test(timeout = 10000) + public void testWaitQueuePreemption() throws InterruptedException { + MockRequest r1 = createMockRequest(1, 1, 100, true, 20000l); + MockRequest r2 = createMockRequest(2, 1, 200, false, 20000l); + MockRequest r3 = createMockRequest(3, 1, 300, false, 20000l); + MockRequest r4 = createMockRequest(4, 1, 400, false, 20000l); + MockRequest r5 = createMockRequest(5, 1, 500, true, 20000l); + + TaskExecutorServiceForTest taskExecutorService = + new TaskExecutorServiceForTest(1, 2, false, true); + taskExecutorService.init(conf); + taskExecutorService.start(); + + try { + taskExecutorService.schedule(r1); + r1.awaitStart(); + try { + taskExecutorService.schedule(r2); + } catch (RejectedExecutionException e) { + fail("Unexpected rejection with space available in queue"); + } + try { + taskExecutorService.schedule(r3); + } catch (RejectedExecutionException e) { + fail("Unexpected rejection with space available in queue"); + } + + try { + taskExecutorService.schedule(r4); + fail("Expecting a Rejection for non finishable task with a full queue"); + } catch (RejectedExecutionException e) { + } + + try { + taskExecutorService.schedule(r5); + } catch (RejectedExecutionException e) { + fail("Unexpected rejection for a finishable task"); + } + + // Ensure the correct task was preempted. + assertEquals(true, r3.wasPreempted()); + + TaskExecutorServiceForTest.InternalCompletionListenerForTest icl1 = + taskExecutorService.getInternalCompletionListenerForTest(r1.getRequestId()); + + // Currently 3 known tasks. 1, 2, 5 + assertEquals(3, taskExecutorService.knownTasks.size()); + assertTrue(taskExecutorService.knownTasks.containsKey(r1.getRequestId())); + assertTrue(taskExecutorService.knownTasks.containsKey(r2.getRequestId())); + assertTrue(taskExecutorService.knownTasks.containsKey(r5.getRequestId())); + + r1.complete(); + icl1.awaitCompletion(); + + // Two known tasks left. r2 and r5. (r1 complete, r3 evicted, r4 rejected) + assertEquals(2, taskExecutorService.knownTasks.size()); + assertTrue(taskExecutorService.knownTasks.containsKey(r2.getRequestId())); + assertTrue(taskExecutorService.knownTasks.containsKey(r5.getRequestId())); + + r5.awaitStart(); + TaskExecutorServiceForTest.InternalCompletionListenerForTest icl5 = + taskExecutorService.getInternalCompletionListenerForTest(r5.getRequestId()); + r5.complete(); + icl5.awaitCompletion(); + + // 1 Pending task which is not finishable + assertEquals(1, taskExecutorService.knownTasks.size()); + assertTrue(taskExecutorService.knownTasks.containsKey(r2.getRequestId())); + + r2.awaitStart(); + TaskExecutorServiceForTest.InternalCompletionListenerForTest icl2 = + taskExecutorService.getInternalCompletionListenerForTest(r2.getRequestId()); + r2.complete(); + icl2.awaitCompletion(); + // 0 Pending task which is not finishable + assertEquals(0, taskExecutorService.knownTasks.size()); + } finally { + taskExecutorService.shutDown(false); + } + } + // ----------- Helper classes and methods go after this point. Tests above this ----------- @@ -446,6 +528,7 @@ public class TestTaskExecutorService { private ConcurrentMap<String, InternalCompletionListenerForTest> completionListeners = new ConcurrentHashMap<>(); + @Override InternalCompletionListener createInternalCompletionListener(TaskWrapper taskWrapper) { InternalCompletionListenerForTest icl = new InternalCompletionListenerForTest(taskWrapper); completionListeners.put(taskWrapper.getRequestId(), icl); @@ -456,7 +539,6 @@ public class TestTaskExecutorService { return completionListeners.get(requestId); } - private class InternalCompletionListenerForTest extends TaskExecutorService.InternalCompletionListener { private final Lock lock = new ReentrantLock();