Repository: hive Updated Branches: refs/heads/branch-2.1 8adb9136f -> f7fdd4e88
HIVE-14403. LLAP node specific preemption will only preempt once on a node per AM. (Siddharth Seth, reviewed by Gunther Hagleitner) (cherry picked from commit 15efc47f8f54efd6c5c3656975cf45e64ea5d7a0) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f7fdd4e8 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f7fdd4e8 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f7fdd4e8 Branch: refs/heads/branch-2.1 Commit: f7fdd4e880b00e0b50dde1801db10e454a4c1dce Parents: 8adb913 Author: Siddharth Seth <ss...@apache.org> Authored: Wed Aug 3 09:42:13 2016 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Wed Aug 3 09:43:16 2016 -0700 ---------------------------------------------------------------------- .../tezplugins/LlapTaskSchedulerService.java | 115 ++++++-- .../TestLlapTaskSchedulerService.java | 274 ++++++++++++++++++- 2 files changed, 364 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f7fdd4e8/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index ad0cc5b..6fa3107 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -443,16 +443,19 @@ public class LlapTaskSchedulerService extends TaskScheduler { int vcores = 0; readLock.lock(); try { + int numInstancesFound = 0; for (ServiceInstance inst : activeInstances.getAll().values()) { if (inst.isAlive()) { Resource r = inst.getResource(); - LOG.info("Found instance " + inst); memory += r.getMemory(); vcores += r.getVirtualCores(); - } else { - LOG.info("Ignoring dead instance " + inst); + numInstancesFound++; } } + if (LOG.isDebugEnabled()) { + LOG.debug("GetTotalResources: numInstancesFound={}, totalMem={}, totalVcores={}", + numInstancesFound, memory, vcores); + } } finally { readLock.unlock(); } @@ -529,6 +532,8 @@ public class LlapTaskSchedulerService extends TaskScheduler { Priority priority, Object containerSignature, Object clientCookie) { TaskInfo taskInfo = new TaskInfo(localityDelayConf, clock, task, clientCookie, priority, capability, hosts, racks, clock.getTime()); + LOG.info("Received allocateRequest. task={}, priority={}, capability={}, hosts={}", task, + priority, capability, Arrays.toString(hosts)); writeLock.lock(); try { dagStats.registerTaskRequest(hosts, racks); @@ -546,6 +551,8 @@ public class LlapTaskSchedulerService extends TaskScheduler { // 1:1 edges are used in Hive. TaskInfo taskInfo = new TaskInfo(localityDelayConf, clock, task, clientCookie, priority, capability, null, null, clock.getTime()); + LOG.info("Received allocateRequest. task={}, priority={}, capability={}, containerId={}", task, + priority, capability, containerId); writeLock.lock(); try { dagStats.registerTaskRequest(null, null); @@ -561,6 +568,10 @@ public class LlapTaskSchedulerService extends TaskScheduler { // the task is no longer required, and asks for a de-allocation. @Override public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason, String diagnostics) { + if (LOG.isDebugEnabled()) { + LOG.debug("Processing deallocateTask for task={}, taskSucceeded={}, endReason={}", task, + taskSucceeded, endReason); + } writeLock.lock(); // Updating several local structures TaskInfo taskInfo; try { @@ -591,6 +602,9 @@ public class LlapTaskSchedulerService extends TaskScheduler { NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance.getWorkerIdentity()); assert nodeInfo != null; + + LOG.info("Processing de-allocate request for task={}, state={}, endReason={}", taskInfo.task, + taskInfo.getState(), endReason); // Re-enable the node if preempted if (taskInfo.getState() == TaskInfo.State.PREEMPTED) { LOG.info("Processing deallocateTask for {} which was preempted, EndReason={}", task, endReason); @@ -686,6 +700,9 @@ public class LlapTaskSchedulerService extends TaskScheduler { */ private SelectHostResult selectHost(TaskInfo request) { String[] requestedHosts = request.requestedHosts; + if (LOG.isDebugEnabled()) { + LOG.debug("selectingHost for task={} on hosts={}", request.task, Arrays.toString(requestedHosts)); + } long schedulerAttemptTime = clock.getTime(); readLock.lock(); // Read-lock. Not updating any stats at the moment. try { @@ -695,6 +712,8 @@ public class LlapTaskSchedulerService extends TaskScheduler { } boolean shouldDelayForLocality = request.shouldDelayForLocality(schedulerAttemptTime); + LOG.debug("ShouldDelayForLocality={} for task={} on hosts={}", shouldDelayForLocality, + request.task, Arrays.toString(requestedHosts)); if (requestedHosts != null && requestedHosts.length > 0) { int prefHostCount = -1; boolean requestedHostsWillBecomeAvailable = false; @@ -723,6 +742,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { } else { if (nodeInfo.getEnableTime() > request.getLocalityDelayTimeout() && nodeInfo.isDisabled() && nodeInfo.hadCommFailure()) { + LOG.debug("Host={} will not become available within requested timeout", nodeInfo); // This node will likely be activated after the task timeout expires. } else { // Worth waiting for the timeout. @@ -745,7 +765,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { if (shouldDelayForLocality) { if (requestedHostsWillBecomeAvailable) { if (LOG.isDebugEnabled()) { - LOG.debug("Skipping non-local location allocation for [" + request.task + + LOG.debug("Delaying local allocation for [" + request.task + "] when trying to allocate on [" + Arrays.toString(requestedHosts) + "]" + ". ScheduleAttemptTime=" + schedulerAttemptTime + ", taskDelayTimeout=" + request.getLocalityDelayTimeout()); @@ -753,16 +773,19 @@ public class LlapTaskSchedulerService extends TaskScheduler { return SELECT_HOST_RESULT_DELAYED_LOCALITY; } else { if (LOG.isDebugEnabled()) { - LOG.debug("Not skipping non-local location allocation for [" + request.task + + LOG.debug("Skipping local allocation for [" + request.task + "] when trying to allocate on [" + Arrays.toString(requestedHosts) + "] since none of these hosts are part of the known list"); } } } } - /* fall through - miss in locality (random scheduling) */ + /* fall through - miss in locality (random scheduling) or no locality-requested */ Entry<String, NodeInfo>[] all = instanceToNodeMap.entrySet().toArray(new Entry[0]); // Check again + if (LOG.isDebugEnabled()) { + LOG.debug("Attempting random allocation for task={}", request.task); + } if (all.length > 0) { int n = random.nextInt(all.length); // start at random offset and iterate whole list @@ -867,6 +890,9 @@ public class LlapTaskSchedulerService extends TaskScheduler { if (metrics != null) { metrics.incrPendingTasksCount(); } + if (LOG.isInfoEnabled()) { + LOG.info("PendingTasksInfo={}", constructPendingTaskCountsLogMessage()); + } } finally { writeLock.unlock(); } @@ -946,10 +972,14 @@ public class LlapTaskSchedulerService extends TaskScheduler { INADEQUATE_TOTAL_RESOURCES, } + @VisibleForTesting protected void schedulePendingTasks() { writeLock.lock(); try { + if (LOG.isDebugEnabled()) { + LOG.debug("ScheduleRun: {}", constructPendingTaskCountsLogMessage()); + } Iterator<Entry<Priority, List<TaskInfo>>> pendingIterator = pendingTasks.entrySet().iterator(); while (pendingIterator.hasNext()) { @@ -967,9 +997,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { } taskInfo.triedAssigningTask(); ScheduleResult scheduleResult = scheduleTask(taskInfo); - if (LOG.isDebugEnabled()) { - LOG.debug("ScheduleResult for Task: {} = {}", taskInfo, scheduleResult); - } + LOG.info("ScheduleResult for Task: {} = {}", taskInfo, scheduleResult); if (scheduleResult == ScheduleResult.SCHEDULED) { taskIter.remove(); } else { @@ -977,6 +1005,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { LOG.info("Inadequate total resources before scheduling pending tasks." + " Signalling scheduler timeout monitor thread to start timer."); startTimeoutMonitor(); + // TODO Nothing else should be done for this task. Move on. } // Try pre-empting a task so that a higher priority task can take it's place. @@ -999,7 +1028,12 @@ public class LlapTaskSchedulerService extends TaskScheduler { potentialHosts = null; } + // At this point we're dealing with all return types, except ScheduleResult.SCHEDULED. if (potentialHosts != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Attempting to preempt on requested host for task={}, potentialHosts={}", + taskInfo, Arrays.toString(potentialHosts)); + } // Preempt on specific host boolean shouldPreempt = true; for (String host : potentialHosts) { @@ -1010,21 +1044,35 @@ public class LlapTaskSchedulerService extends TaskScheduler { MutableInt pendingHostPreemptions = pendingPreemptionsPerHost.get(host); if (pendingHostPreemptions != null && pendingHostPreemptions.intValue() > 0) { shouldPreempt = false; + LOG.debug( + "Not preempting for task={}. Found an existing preemption request on host={}, pendingPreemptionCount={}", + taskInfo.task, host, pendingHostPreemptions.intValue()); break; } } if (shouldPreempt) { - LOG.info("Attempting to preempt for {}, pendingPreemptions={} on hosts={}", - taskInfo.task, pendingPreemptions.get(), Arrays.toString(potentialHosts)); + LOG.debug("Preempting for {} on potential hosts={}. TotalPendingPreemptions={}", + taskInfo.task, Arrays.toString(potentialHosts), pendingPreemptions.get()); preemptTasks(entry.getKey().getPriority(), 1, potentialHosts); + } else { + LOG.debug( + "Not preempting for {} on potential hosts={}. An existing preemption request exists", + taskInfo.task, Arrays.toString(potentialHosts)); } - } else { + } else { // Either DELAYED_RESOURCES or DELAYED_LOCALITY with an unknown requested host. // Request for a preemption if there's none pending. If a single preemption is pending, // and this is the next task to be assigned, it will be assigned once that slot becomes available. + LOG.debug("Attempting to preempt on any host for task={}, pendingPreemptions={}", + taskInfo.task, pendingPreemptions.get()); if (pendingPreemptions.get() == 0) { - LOG.info("Attempting to preempt for {}, pendingPreemptions={} on any host", - taskInfo.task, pendingPreemptions.get()); + LOG.info("Preempting for task={} on any available host", taskInfo.task); preemptTasks(entry.getKey().getPriority(), 1, null); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Skipping preemption since there are {} pending preemption request. For task={}", + pendingPreemptions.get(), taskInfo); + } } } // Since there was an allocation failure - don't try assigning tasks at the next priority. @@ -1041,6 +1089,9 @@ public class LlapTaskSchedulerService extends TaskScheduler { pendingIterator.remove(); } if (!scheduledAllAtPriority) { + LOG.debug( + "Unable to schedule all requests at priority={}. Skipping subsequent priority levels", + entry.getKey()); // Don't attempt scheduling for additional priorities break; } @@ -1050,6 +1101,22 @@ public class LlapTaskSchedulerService extends TaskScheduler { } } + private String constructPendingTaskCountsLogMessage() { + StringBuilder sb = new StringBuilder(); + int totalCount = 0; + sb.append("numPriorityLevels=").append(pendingTasks.size()).append(". "); + Iterator<Entry<Priority, List<TaskInfo>>> pendingIterator = + pendingTasks.entrySet().iterator(); + while (pendingIterator.hasNext()) { + Entry<Priority, List<TaskInfo>> entry = pendingIterator.next(); + int count = entry.getValue() == null ? 0 : entry.getValue().size(); + sb.append("[p=").append(entry.getKey().toString()).append(",c=").append(count).append("]"); + totalCount += count; + } + sb.append(". totalPendingTasks=").append(totalCount); + sb.append(". delayedTaskQueueSize=").append(delayedTaskQueue.size()); + return sb.toString(); + } private ScheduleResult scheduleTask(TaskInfo taskInfo) { SelectHostResult selectHostResult = selectHost(taskInfo); @@ -1062,7 +1129,8 @@ public class LlapTaskSchedulerService extends TaskScheduler { nsPair.getServiceInstance().getServicesAddress()); writeLock.lock(); // While updating local structures try { - LOG.info("Assigned task {} to container {}", taskInfo, container.getId()); + LOG.info("Assigned task {} to container {} on node={}", taskInfo, container.getId(), + nsPair.getServiceInstance()); dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks, nsPair.getServiceInstance().getHost()); taskInfo.setAssignmentInfo(nsPair.getServiceInstance(), container.getId(), clock.getTime()); @@ -1150,7 +1218,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { } MutableInt val = pendingPreemptionsPerHost.get(host); if (val == null) { - val = new MutableInt(1); + val = new MutableInt(0); pendingPreemptionsPerHost.put(host, val); } val.increment(); @@ -1417,23 +1485,21 @@ public class LlapTaskSchedulerService extends TaskScheduler { int pendingQueueuCapacity = 0; String pendingQueueCapacityString = serviceInstance.getProperties() .get(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname); - if (LOG.isDebugEnabled()) { - LOG.debug("Setting up node: " + serviceInstance + ", with available capacity=" + - serviceInstance.getResource().getVirtualCores() + ", pendingQueueCapacity=" + - pendingQueueCapacityString); - } - + LOG.info("Setting up node: {} with available capacity={}, pendingQueueSize={}, memory={}", + serviceInstance, serviceInstance.getResource().getVirtualCores(), + pendingQueueCapacityString, serviceInstance.getResource().getMemory()); if (pendingQueueCapacityString != null) { pendingQueueuCapacity = Integer.parseInt(pendingQueueCapacityString); } this.numSchedulableTasks = serviceInstance.getResource().getVirtualCores() + pendingQueueuCapacity; } else { this.numSchedulableTasks = numSchedulableTasksConf; + LOG.info("Setting up node: " + serviceInstance + " with schedulableCapacity=" + this.numSchedulableTasks); } if (metrics != null) { metrics.incrSchedulableTasksCount(numSchedulableTasks); } - LOG.info("Setting up node: " + serviceInstance + " with schedulableCapacity=" + this.numSchedulableTasks); + } ServiceInstance getServiceInstance() { @@ -1466,7 +1532,8 @@ public class LlapTaskSchedulerService extends TaskScheduler { delayTime = blacklistConf.maxDelay; } if (LOG.isInfoEnabled()) { - LOG.info("Disabling instance " + serviceInstance + " for " + delayTime + " milli-seconds"); + LOG.info("Disabling instance {} for {} milli-seconds. commFailure={}", serviceInstance, + delayTime, commFailure); } expireTimeMillis = currentTime + delayTime; numSuccessfulTasksAtLastBlacklist = numSuccessfulTasks; http://git-wip-us.apache.org/repos/asf/hive/blob/f7fdd4e8/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java index e4fe79c..afbab95 100644 --- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java +++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java @@ -464,7 +464,7 @@ public class TestLlapTaskSchedulerService { String [] hostsH1 = new String[] {HOST1}; - TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l); + TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 2, 0, -1l); // Fill up host1 with p2 tasks. // Leave host2 empty @@ -522,6 +522,278 @@ public class TestLlapTaskSchedulerService { } @Test(timeout = 10000) + public void testPreemptionChoiceTimeOrdering() throws IOException, InterruptedException { + + Priority priority1 = Priority.newInstance(1); + Priority priority2 = Priority.newInstance(2); + + String [] hosts = new String[] {HOST1, HOST2}; + + String [] hostsH1 = new String[] {HOST1}; + + TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l); + + try { + Object task1 = "task1"; + Object clientCookie1 = "cookie1"; + Object task2 = "task2"; + Object clientCookie2 = "cookie2"; + Object task3 = "task3"; + Object clientCookie3 = "cookie3"; + + tsWrapper.controlScheduler(true); + + ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class); + ArgumentCaptor<Container> cArgCaptor = ArgumentCaptor.forClass(Container.class); + + // Request task1 + tsWrapper.getClock().setTime(10000l); + tsWrapper.allocateTask(task1, hostsH1, priority2, clientCookie1); + tsWrapper.awaitLocalTaskAllocations(1); + verify(tsWrapper.mockAppCallback, times(1)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), cArgCaptor.capture()); + ContainerId t1Cid = cArgCaptor.getValue().getId(); + + reset(tsWrapper.mockAppCallback); + // Move clock backwards (so that t1 allocation is after t2 allocation) + // Request task2 (task1 already started at previously set time) + tsWrapper.getClock().setTime(tsWrapper.getClock().getTime() - 1000); + tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2); + tsWrapper.awaitLocalTaskAllocations(2); + verify(tsWrapper.mockAppCallback, times(1)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), cArgCaptor.capture()); + + + reset(tsWrapper.mockAppCallback); + // Move clock forward, and request a task at p=1 + tsWrapper.getClock().setTime(tsWrapper.getClock().getTime() + 2000); + + tsWrapper.allocateTask(task3, hostsH1, priority1, clientCookie3); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numPreemptedTasks == 1) { + break; + } + } + // Ensure task1 is preempted based on time (match it's allocated containerId) + ArgumentCaptor<ContainerId> cIdArgCaptor = ArgumentCaptor.forClass(ContainerId.class); + verify(tsWrapper.mockAppCallback).preemptContainer(cIdArgCaptor.capture()); + assertEquals(t1Cid, cIdArgCaptor.getValue()); + + } finally { + tsWrapper.shutdown(); + } + + } + + @Test(timeout = 10000) + public void testForcedLocalityMultiplePreemptionsSameHost1() throws IOException, + InterruptedException { + Priority priority1 = Priority.newInstance(1); + Priority priority2 = Priority.newInstance(2); + + String [] hosts = new String[] {HOST1, HOST2}; + + String [] hostsH1 = new String[] {HOST1}; + + TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l); + + // Fill up host1 with p2 tasks. + // Leave host2 empty + // Try running p1 task on host1 - should preempt + // Await preemption request. + // Try running another p1 task on host1 - should preempt + // Await preemption request. + + try { + + Object task1 = "task1"; + Object clientCookie1 = "cookie1"; + Object task2 = "task2"; + Object clientCookie2 = "cookie2"; + Object task3 = "task3"; + Object clientCookie3 = "cookie3"; + Object task4 = "task4"; + Object clientCookie4 = "cookie4"; + + tsWrapper.controlScheduler(true); + tsWrapper.allocateTask(task1, hostsH1, priority2, clientCookie1); + tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2); + + tsWrapper.awaitLocalTaskAllocations(2); + verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); + ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class); + ArgumentCaptor<Container> cArgCaptor = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(2)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), cArgCaptor.capture()); + assertEquals(2, argumentCaptor.getAllValues().size()); + assertEquals(task1, argumentCaptor.getAllValues().get(0)); + assertEquals(task2, argumentCaptor.getAllValues().get(1)); + assertEquals(2, cArgCaptor.getAllValues().size()); + ContainerId t1CId = cArgCaptor.getAllValues().get(0).getId(); + + reset(tsWrapper.mockAppCallback); + // At this point. 2 tasks running - both at priority 2. + // Try running a priority 1 task + tsWrapper.allocateTask(task3, hostsH1, priority1, clientCookie3); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numPreemptedTasks == 1) { + break; + } + } + ArgumentCaptor<ContainerId> cIdArgCaptor = ArgumentCaptor.forClass(ContainerId.class); + verify(tsWrapper.mockAppCallback).preemptContainer(cIdArgCaptor.capture()); + + // Determin which task has been preempted. Normally task2 would be preempted based on it starting + // later. However - both may have the same start time, so either could be picked. + Object deallocatedTask1; // De-allocated now + Object deallocatedTask2; // Will be de-allocated later. + if (cIdArgCaptor.getValue().equals(t1CId)) { + deallocatedTask1 = task1; + deallocatedTask2 = task2; + } else { + deallocatedTask1 = task2; + deallocatedTask2 = task1; + } + + tsWrapper.deallocateTask(deallocatedTask1, false, TaskAttemptEndReason.INTERNAL_PREEMPTION); + + tsWrapper.awaitLocalTaskAllocations(3); + + verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task3), + eq(clientCookie3), any(Container.class)); + + reset(tsWrapper.mockAppCallback); + // At this point. one p=2 task and task3(p=1) running. Ask for another p1 task. + tsWrapper.allocateTask(task4, hostsH1, priority1, clientCookie4); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numPreemptedTasks == 2) { + break; + } + } + verify(tsWrapper.mockAppCallback).preemptContainer(any(ContainerId.class)); + + tsWrapper.deallocateTask(deallocatedTask2, false, TaskAttemptEndReason.INTERNAL_PREEMPTION); + + tsWrapper.awaitLocalTaskAllocations(4); + + verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4), + eq(clientCookie4), any(Container.class)); + + + } finally { + tsWrapper.shutdown(); + } + } + + @Test(timeout = 10000) + public void testForcedLocalityMultiplePreemptionsSameHost2() throws IOException, + InterruptedException { + Priority priority1 = Priority.newInstance(1); + Priority priority2 = Priority.newInstance(2); + + String [] hosts = new String[] {HOST1, HOST2}; + + String [] hostsH1 = new String[] {HOST1}; + + TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l); + + // Fill up host1 with p2 tasks. + // Leave host2 empty + // Try running both p1 tasks on host1. + // R: Single preemption triggered, followed by allocation, followed by another preemption. + // + + try { + + Object task1 = "task1"; + Object clientCookie1 = "cookie1"; + Object task2 = "task2"; + Object clientCookie2 = "cookie2"; + Object task3 = "task3"; + Object clientCookie3 = "cookie3"; + Object task4 = "task4"; + Object clientCookie4 = "cookie4"; + + tsWrapper.controlScheduler(true); + tsWrapper.allocateTask(task1, hostsH1, priority2, clientCookie1); + tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2); + + tsWrapper.awaitLocalTaskAllocations(2); + verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); + ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class); + ArgumentCaptor<Container> cArgCaptor = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(2)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), cArgCaptor.capture()); + assertEquals(2, argumentCaptor.getAllValues().size()); + assertEquals(task1, argumentCaptor.getAllValues().get(0)); + assertEquals(task2, argumentCaptor.getAllValues().get(1)); + assertEquals(2, cArgCaptor.getAllValues().size()); + ContainerId t1CId = cArgCaptor.getAllValues().get(0).getId(); + + reset(tsWrapper.mockAppCallback); + // At this point. 2 tasks running - both at priority 2. + // Try running a priority 1 task + tsWrapper.allocateTask(task3, hostsH1, priority1, clientCookie3); + tsWrapper.allocateTask(task4, hostsH1, priority1, clientCookie4); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numPreemptedTasks == 1) { + break; + } + } + ArgumentCaptor<ContainerId> cIdArgCaptor = ArgumentCaptor.forClass(ContainerId.class); + verify(tsWrapper.mockAppCallback).preemptContainer(cIdArgCaptor.capture()); + + // Determin which task has been preempted. Normally task2 would be preempted based on it starting + // later. However - both may have the same start time, so either could be picked. + Object deallocatedTask1; // De-allocated now + Object deallocatedTask2; // Will be de-allocated later. + if (cIdArgCaptor.getValue().equals(t1CId)) { + deallocatedTask1 = task1; + deallocatedTask2 = task2; + } else { + deallocatedTask1 = task2; + deallocatedTask2 = task1; + } + + tsWrapper.deallocateTask(deallocatedTask1, false, TaskAttemptEndReason.INTERNAL_PREEMPTION); + + tsWrapper.awaitLocalTaskAllocations(3); + + verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task3), + eq(clientCookie3), any(Container.class)); + + // At this point. one p=2 task and task3(p=1) running. Ask for another p1 task. + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numPreemptedTasks == 2) { + break; + } + } + verify(tsWrapper.mockAppCallback, times(2)).preemptContainer(any(ContainerId.class)); + + tsWrapper.deallocateTask(deallocatedTask2, false, TaskAttemptEndReason.INTERNAL_PREEMPTION); + + tsWrapper.awaitLocalTaskAllocations(4); + + verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4), + eq(clientCookie4), any(Container.class)); + + + } finally { + tsWrapper.shutdown(); + } + } + + @Test(timeout = 10000) public void testForcedLocalityNotInDelayedQueue() throws IOException, InterruptedException { String[] hosts = new String[]{HOST1, HOST2};