Repository: hive
Updated Branches:
  refs/heads/branch-2.1 8adb9136f -> f7fdd4e88


HIVE-14403. LLAP node specific preemption will only preempt once on a node per 
AM. (Siddharth Seth, reviewed by Gunther Hagleitner)

(cherry picked from commit 15efc47f8f54efd6c5c3656975cf45e64ea5d7a0)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f7fdd4e8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f7fdd4e8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f7fdd4e8

Branch: refs/heads/branch-2.1
Commit: f7fdd4e880b00e0b50dde1801db10e454a4c1dce
Parents: 8adb913
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Aug 3 09:42:13 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Aug 3 09:43:16 2016 -0700

----------------------------------------------------------------------
 .../tezplugins/LlapTaskSchedulerService.java    | 115 ++++++--
 .../TestLlapTaskSchedulerService.java           | 274 ++++++++++++++++++-
 2 files changed, 364 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f7fdd4e8/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index ad0cc5b..6fa3107 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -443,16 +443,19 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
     int vcores = 0;
     readLock.lock();
     try {
+      int numInstancesFound = 0;
       for (ServiceInstance inst : activeInstances.getAll().values()) {
         if (inst.isAlive()) {
           Resource r = inst.getResource();
-          LOG.info("Found instance " + inst);
           memory += r.getMemory();
           vcores += r.getVirtualCores();
-        } else {
-          LOG.info("Ignoring dead instance " + inst);
+          numInstancesFound++;
         }
       }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("GetTotalResources: numInstancesFound={}, totalMem={}, 
totalVcores={}",
+            numInstancesFound, memory, vcores);
+      }
     } finally {
       readLock.unlock();
     }
@@ -529,6 +532,8 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
       Priority priority, Object containerSignature, Object clientCookie) {
     TaskInfo taskInfo =
         new TaskInfo(localityDelayConf, clock, task, clientCookie, priority, 
capability, hosts, racks, clock.getTime());
+    LOG.info("Received allocateRequest. task={}, priority={}, capability={}, 
hosts={}", task,
+        priority, capability, Arrays.toString(hosts));
     writeLock.lock();
     try {
       dagStats.registerTaskRequest(hosts, racks);
@@ -546,6 +551,8 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
     // 1:1 edges are used in Hive.
     TaskInfo taskInfo =
         new TaskInfo(localityDelayConf, clock, task, clientCookie, priority, 
capability, null, null, clock.getTime());
+    LOG.info("Received allocateRequest. task={}, priority={}, capability={}, 
containerId={}", task,
+        priority, capability, containerId);
     writeLock.lock();
     try {
       dagStats.registerTaskRequest(null, null);
@@ -561,6 +568,10 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
   // the task is no longer required, and asks for a de-allocation.
   @Override
   public boolean deallocateTask(Object task, boolean taskSucceeded, 
TaskAttemptEndReason endReason, String diagnostics) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing deallocateTask for task={}, taskSucceeded={}, 
endReason={}", task,
+          taskSucceeded, endReason);
+    }
     writeLock.lock(); // Updating several local structures
     TaskInfo taskInfo;
     try {
@@ -591,6 +602,9 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
       NodeInfo nodeInfo = 
instanceToNodeMap.get(assignedInstance.getWorkerIdentity());
       assert nodeInfo != null;
 
+
+      LOG.info("Processing de-allocate request for task={}, state={}, 
endReason={}", taskInfo.task,
+          taskInfo.getState(), endReason);
       // Re-enable the node if preempted
       if (taskInfo.getState() == TaskInfo.State.PREEMPTED) {
         LOG.info("Processing deallocateTask for {} which was preempted, 
EndReason={}", task, endReason);
@@ -686,6 +700,9 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
    */
   private SelectHostResult selectHost(TaskInfo request) {
     String[] requestedHosts = request.requestedHosts;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("selectingHost for task={} on hosts={}", request.task, 
Arrays.toString(requestedHosts));
+    }
     long schedulerAttemptTime = clock.getTime();
     readLock.lock(); // Read-lock. Not updating any stats at the moment.
     try {
@@ -695,6 +712,8 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
       }
 
       boolean shouldDelayForLocality = 
request.shouldDelayForLocality(schedulerAttemptTime);
+      LOG.debug("ShouldDelayForLocality={} for task={} on hosts={}", 
shouldDelayForLocality,
+          request.task, Arrays.toString(requestedHosts));
       if (requestedHosts != null && requestedHosts.length > 0) {
         int prefHostCount = -1;
         boolean requestedHostsWillBecomeAvailable = false;
@@ -723,6 +742,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
                     } else {
                       if (nodeInfo.getEnableTime() > 
request.getLocalityDelayTimeout() &&
                           nodeInfo.isDisabled() && nodeInfo.hadCommFailure()) {
+                        LOG.debug("Host={} will not become available within 
requested timeout", nodeInfo);
                         // This node will likely be activated after the task 
timeout expires.
                       } else {
                         // Worth waiting for the timeout.
@@ -745,7 +765,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
         if (shouldDelayForLocality) {
           if (requestedHostsWillBecomeAvailable) {
             if (LOG.isDebugEnabled()) {
-              LOG.debug("Skipping non-local location allocation for [" + 
request.task +
+              LOG.debug("Delaying local allocation for [" + request.task +
                   "] when trying to allocate on [" + 
Arrays.toString(requestedHosts) + "]" +
                   ". ScheduleAttemptTime=" + schedulerAttemptTime + ", 
taskDelayTimeout=" +
                   request.getLocalityDelayTimeout());
@@ -753,16 +773,19 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
             return SELECT_HOST_RESULT_DELAYED_LOCALITY;
           } else {
             if (LOG.isDebugEnabled()) {
-              LOG.debug("Not skipping non-local location allocation for [" + 
request.task +
+              LOG.debug("Skipping local allocation for [" + request.task +
                   "] when trying to allocate on [" + 
Arrays.toString(requestedHosts) +
                   "] since none of these hosts are part of the known list");
             }
           }
         }
       }
-      /* fall through - miss in locality (random scheduling) */
+      /* fall through - miss in locality (random scheduling) or no 
locality-requested */
       Entry<String, NodeInfo>[] all = instanceToNodeMap.entrySet().toArray(new 
Entry[0]);
       // Check again
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Attempting random allocation for task={}", request.task);
+      }
       if (all.length > 0) {
         int n = random.nextInt(all.length);
         // start at random offset and iterate whole list
@@ -867,6 +890,9 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
       if (metrics != null) {
         metrics.incrPendingTasksCount();
       }
+      if (LOG.isInfoEnabled()) {
+        LOG.info("PendingTasksInfo={}", 
constructPendingTaskCountsLogMessage());
+      }
     } finally {
       writeLock.unlock();
     }
@@ -946,10 +972,14 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
     INADEQUATE_TOTAL_RESOURCES,
   }
 
+
   @VisibleForTesting
   protected void schedulePendingTasks() {
     writeLock.lock();
     try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("ScheduleRun: {}", constructPendingTaskCountsLogMessage());
+      }
       Iterator<Entry<Priority, List<TaskInfo>>> pendingIterator =
           pendingTasks.entrySet().iterator();
       while (pendingIterator.hasNext()) {
@@ -967,9 +997,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
           }
           taskInfo.triedAssigningTask();
           ScheduleResult scheduleResult = scheduleTask(taskInfo);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("ScheduleResult for Task: {} = {}", taskInfo, 
scheduleResult);
-          }
+          LOG.info("ScheduleResult for Task: {} = {}", taskInfo, 
scheduleResult);
           if (scheduleResult == ScheduleResult.SCHEDULED) {
             taskIter.remove();
           } else {
@@ -977,6 +1005,7 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
               LOG.info("Inadequate total resources before scheduling pending 
tasks." +
                   " Signalling scheduler timeout monitor thread to start 
timer.");
               startTimeoutMonitor();
+              // TODO Nothing else should be done for this task. Move on.
             }
 
             // Try pre-empting a task so that a higher priority task can take 
it's place.
@@ -999,7 +1028,12 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
               potentialHosts = null;
             }
 
+            // At this point we're dealing with all return types, except 
ScheduleResult.SCHEDULED.
             if (potentialHosts != null) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Attempting to preempt on requested host for 
task={}, potentialHosts={}",
+                    taskInfo, Arrays.toString(potentialHosts));
+              }
               // Preempt on specific host
               boolean shouldPreempt = true;
               for (String host : potentialHosts) {
@@ -1010,21 +1044,35 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
                 MutableInt pendingHostPreemptions = 
pendingPreemptionsPerHost.get(host);
                 if (pendingHostPreemptions != null && 
pendingHostPreemptions.intValue() > 0) {
                   shouldPreempt = false;
+                  LOG.debug(
+                      "Not preempting for task={}. Found an existing 
preemption request on host={}, pendingPreemptionCount={}",
+                      taskInfo.task, host, pendingHostPreemptions.intValue());
                   break;
                 }
               }
               if (shouldPreempt) {
-                LOG.info("Attempting to preempt for {}, pendingPreemptions={} 
on hosts={}",
-                    taskInfo.task, pendingPreemptions.get(), 
Arrays.toString(potentialHosts));
+                LOG.debug("Preempting for {} on potential hosts={}. 
TotalPendingPreemptions={}",
+                    taskInfo.task, Arrays.toString(potentialHosts), 
pendingPreemptions.get());
                 preemptTasks(entry.getKey().getPriority(), 1, potentialHosts);
+              } else {
+                LOG.debug(
+                    "Not preempting for {} on potential hosts={}. An existing 
preemption request exists",
+                    taskInfo.task, Arrays.toString(potentialHosts));
               }
-            } else {
+            } else { // Either DELAYED_RESOURCES or DELAYED_LOCALITY with an 
unknown requested host.
               // Request for a preemption if there's none pending. If a single 
preemption is pending,
               // and this is the next task to be assigned, it will be assigned 
once that slot becomes available.
+              LOG.debug("Attempting to preempt on any host for task={}, 
pendingPreemptions={}",
+                  taskInfo.task, pendingPreemptions.get());
               if (pendingPreemptions.get() == 0) {
-                LOG.info("Attempting to preempt for {}, pendingPreemptions={} 
on any host",
-                    taskInfo.task, pendingPreemptions.get());
+                LOG.info("Preempting for task={} on any available host", 
taskInfo.task);
                 preemptTasks(entry.getKey().getPriority(), 1, null);
+              } else {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug(
+                      "Skipping preemption since there are {} pending 
preemption request. For task={}",
+                      pendingPreemptions.get(), taskInfo);
+                }
               }
             }
             // Since there was an allocation failure - don't try assigning 
tasks at the next priority.
@@ -1041,6 +1089,9 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
           pendingIterator.remove();
         }
         if (!scheduledAllAtPriority) {
+          LOG.debug(
+              "Unable to schedule all requests at priority={}. Skipping 
subsequent priority levels",
+              entry.getKey());
           // Don't attempt scheduling for additional priorities
           break;
         }
@@ -1050,6 +1101,22 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
     }
   }
 
+  private String constructPendingTaskCountsLogMessage() {
+    StringBuilder sb = new StringBuilder();
+    int totalCount = 0;
+    sb.append("numPriorityLevels=").append(pendingTasks.size()).append(". ");
+    Iterator<Entry<Priority, List<TaskInfo>>> pendingIterator =
+        pendingTasks.entrySet().iterator();
+    while (pendingIterator.hasNext()) {
+      Entry<Priority, List<TaskInfo>> entry = pendingIterator.next();
+      int count = entry.getValue() == null ? 0 : entry.getValue().size();
+      
sb.append("[p=").append(entry.getKey().toString()).append(",c=").append(count).append("]");
+      totalCount += count;
+    }
+    sb.append(". totalPendingTasks=").append(totalCount);
+    sb.append(". delayedTaskQueueSize=").append(delayedTaskQueue.size());
+    return sb.toString();
+  }
 
   private ScheduleResult scheduleTask(TaskInfo taskInfo) {
     SelectHostResult selectHostResult = selectHost(taskInfo);
@@ -1062,7 +1129,8 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
               nsPair.getServiceInstance().getServicesAddress());
       writeLock.lock(); // While updating local structures
       try {
-        LOG.info("Assigned task {} to container {}", taskInfo, 
container.getId());
+        LOG.info("Assigned task {} to container {} on node={}", taskInfo, 
container.getId(),
+            nsPair.getServiceInstance());
         dagStats.registerTaskAllocated(taskInfo.requestedHosts, 
taskInfo.requestedRacks,
             nsPair.getServiceInstance().getHost());
         taskInfo.setAssignmentInfo(nsPair.getServiceInstance(), 
container.getId(), clock.getTime());
@@ -1150,7 +1218,7 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
       }
       MutableInt val = pendingPreemptionsPerHost.get(host);
       if (val == null) {
-        val = new MutableInt(1);
+        val = new MutableInt(0);
         pendingPreemptionsPerHost.put(host, val);
       }
       val.increment();
@@ -1417,23 +1485,21 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
         int pendingQueueuCapacity = 0;
         String pendingQueueCapacityString = serviceInstance.getProperties()
             .get(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Setting up node: " + serviceInstance + ", with available 
capacity=" +
-              serviceInstance.getResource().getVirtualCores() + ", 
pendingQueueCapacity=" +
-              pendingQueueCapacityString);
-        }
-
+        LOG.info("Setting up node: {} with available capacity={}, 
pendingQueueSize={}, memory={}",
+            serviceInstance, serviceInstance.getResource().getVirtualCores(),
+            pendingQueueCapacityString, 
serviceInstance.getResource().getMemory());
         if (pendingQueueCapacityString != null) {
           pendingQueueuCapacity = Integer.parseInt(pendingQueueCapacityString);
         }
         this.numSchedulableTasks = 
serviceInstance.getResource().getVirtualCores() + pendingQueueuCapacity;
       } else {
         this.numSchedulableTasks = numSchedulableTasksConf;
+        LOG.info("Setting up node: " + serviceInstance + " with 
schedulableCapacity=" + this.numSchedulableTasks);
       }
       if (metrics != null) {
         metrics.incrSchedulableTasksCount(numSchedulableTasks);
       }
-      LOG.info("Setting up node: " + serviceInstance + " with 
schedulableCapacity=" + this.numSchedulableTasks);
+
     }
 
     ServiceInstance getServiceInstance() {
@@ -1466,7 +1532,8 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
         delayTime = blacklistConf.maxDelay;
       }
       if (LOG.isInfoEnabled()) {
-        LOG.info("Disabling instance " + serviceInstance + " for " + delayTime 
+ " milli-seconds");
+        LOG.info("Disabling instance {} for {} milli-seconds. commFailure={}", 
serviceInstance,
+            delayTime, commFailure);
       }
       expireTimeMillis = currentTime + delayTime;
       numSuccessfulTasksAtLastBlacklist = numSuccessfulTasks;

http://git-wip-us.apache.org/repos/asf/hive/blob/f7fdd4e8/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
 
b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
index e4fe79c..afbab95 100644
--- 
a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
+++ 
b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
@@ -464,7 +464,7 @@ public class TestLlapTaskSchedulerService {
 
     String [] hostsH1 = new String[] {HOST1};
 
-    TestTaskSchedulerServiceWrapper tsWrapper = new 
TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l);
+    TestTaskSchedulerServiceWrapper tsWrapper = new 
TestTaskSchedulerServiceWrapper(2000, hosts, 2, 0, -1l);
 
     // Fill up host1 with p2 tasks.
     // Leave host2 empty
@@ -522,6 +522,278 @@ public class TestLlapTaskSchedulerService {
   }
 
   @Test(timeout = 10000)
+  public void testPreemptionChoiceTimeOrdering() throws IOException, 
InterruptedException {
+
+    Priority priority1 = Priority.newInstance(1);
+    Priority priority2 = Priority.newInstance(2);
+
+    String [] hosts = new String[] {HOST1, HOST2};
+
+    String [] hostsH1 = new String[] {HOST1};
+
+    TestTaskSchedulerServiceWrapper tsWrapper = new 
TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l);
+
+    try {
+      Object task1 = "task1";
+      Object clientCookie1 = "cookie1";
+      Object task2 = "task2";
+      Object clientCookie2 = "cookie2";
+      Object task3 = "task3";
+      Object clientCookie3 = "cookie3";
+
+      tsWrapper.controlScheduler(true);
+
+      ArgumentCaptor<Object> argumentCaptor = 
ArgumentCaptor.forClass(Object.class);
+      ArgumentCaptor<Container> cArgCaptor = 
ArgumentCaptor.forClass(Container.class);
+
+      // Request task1
+      tsWrapper.getClock().setTime(10000l);
+      tsWrapper.allocateTask(task1, hostsH1, priority2, clientCookie1);
+      tsWrapper.awaitLocalTaskAllocations(1);
+      verify(tsWrapper.mockAppCallback, times(1))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), 
cArgCaptor.capture());
+      ContainerId t1Cid = cArgCaptor.getValue().getId();
+
+      reset(tsWrapper.mockAppCallback);
+      // Move clock backwards (so that t1 allocation is after t2 allocation)
+      // Request task2 (task1 already started at previously set time)
+      tsWrapper.getClock().setTime(tsWrapper.getClock().getTime() - 1000);
+      tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2);
+      tsWrapper.awaitLocalTaskAllocations(2);
+      verify(tsWrapper.mockAppCallback, times(1))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), 
cArgCaptor.capture());
+
+
+      reset(tsWrapper.mockAppCallback);
+      // Move clock forward, and request a task at p=1
+      tsWrapper.getClock().setTime(tsWrapper.getClock().getTime() + 2000);
+
+      tsWrapper.allocateTask(task3, hostsH1, priority1, clientCookie3);
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numPreemptedTasks == 1) {
+          break;
+        }
+      }
+      // Ensure task1 is preempted based on time (match it's allocated 
containerId)
+      ArgumentCaptor<ContainerId> cIdArgCaptor = 
ArgumentCaptor.forClass(ContainerId.class);
+      
verify(tsWrapper.mockAppCallback).preemptContainer(cIdArgCaptor.capture());
+      assertEquals(t1Cid, cIdArgCaptor.getValue());
+
+    } finally {
+      tsWrapper.shutdown();
+    }
+
+  }
+
+  @Test(timeout = 10000)
+  public void testForcedLocalityMultiplePreemptionsSameHost1() throws 
IOException,
+      InterruptedException {
+    Priority priority1 = Priority.newInstance(1);
+    Priority priority2 = Priority.newInstance(2);
+
+    String [] hosts = new String[] {HOST1, HOST2};
+
+    String [] hostsH1 = new String[] {HOST1};
+
+    TestTaskSchedulerServiceWrapper tsWrapper = new 
TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l);
+
+    // Fill up host1 with p2 tasks.
+    // Leave host2 empty
+    // Try running p1 task on host1 - should preempt
+    // Await preemption request.
+    // Try running another p1 task on host1 - should preempt
+    // Await preemption request.
+
+    try {
+
+      Object task1 = "task1";
+      Object clientCookie1 = "cookie1";
+      Object task2 = "task2";
+      Object clientCookie2 = "cookie2";
+      Object task3 = "task3";
+      Object clientCookie3 = "cookie3";
+      Object task4 = "task4";
+      Object clientCookie4 = "cookie4";
+
+      tsWrapper.controlScheduler(true);
+      tsWrapper.allocateTask(task1, hostsH1, priority2, clientCookie1);
+      tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2);
+
+      tsWrapper.awaitLocalTaskAllocations(2);
+      verify(tsWrapper.mockAppCallback, 
never()).preemptContainer(any(ContainerId.class));
+      ArgumentCaptor<Object> argumentCaptor = 
ArgumentCaptor.forClass(Object.class);
+      ArgumentCaptor<Container> cArgCaptor = 
ArgumentCaptor.forClass(Container.class);
+      verify(tsWrapper.mockAppCallback, times(2))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), 
cArgCaptor.capture());
+      assertEquals(2, argumentCaptor.getAllValues().size());
+      assertEquals(task1, argumentCaptor.getAllValues().get(0));
+      assertEquals(task2, argumentCaptor.getAllValues().get(1));
+      assertEquals(2, cArgCaptor.getAllValues().size());
+      ContainerId t1CId = cArgCaptor.getAllValues().get(0).getId();
+
+      reset(tsWrapper.mockAppCallback);
+      // At this point. 2 tasks running - both at priority 2.
+      // Try running a priority 1 task
+      tsWrapper.allocateTask(task3, hostsH1, priority1, clientCookie3);
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numPreemptedTasks == 1) {
+          break;
+        }
+      }
+      ArgumentCaptor<ContainerId> cIdArgCaptor = 
ArgumentCaptor.forClass(ContainerId.class);
+      
verify(tsWrapper.mockAppCallback).preemptContainer(cIdArgCaptor.capture());
+
+      // Determin which task has been preempted. Normally task2 would be 
preempted based on it starting
+      // later. However - both may have the same start time, so either could 
be picked.
+      Object deallocatedTask1; // De-allocated now
+      Object deallocatedTask2; // Will be de-allocated later.
+      if (cIdArgCaptor.getValue().equals(t1CId)) {
+        deallocatedTask1 = task1;
+        deallocatedTask2 = task2;
+      } else {
+        deallocatedTask1 = task2;
+        deallocatedTask2 = task1;
+      }
+
+      tsWrapper.deallocateTask(deallocatedTask1, false, 
TaskAttemptEndReason.INTERNAL_PREEMPTION);
+
+      tsWrapper.awaitLocalTaskAllocations(3);
+
+      verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task3),
+          eq(clientCookie3), any(Container.class));
+
+      reset(tsWrapper.mockAppCallback);
+      // At this point. one p=2 task and task3(p=1) running. Ask for another 
p1 task.
+      tsWrapper.allocateTask(task4, hostsH1, priority1, clientCookie4);
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numPreemptedTasks == 2) {
+          break;
+        }
+      }
+      
verify(tsWrapper.mockAppCallback).preemptContainer(any(ContainerId.class));
+
+      tsWrapper.deallocateTask(deallocatedTask2, false, 
TaskAttemptEndReason.INTERNAL_PREEMPTION);
+
+      tsWrapper.awaitLocalTaskAllocations(4);
+
+      verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4),
+          eq(clientCookie4), any(Container.class));
+
+
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testForcedLocalityMultiplePreemptionsSameHost2() throws 
IOException,
+      InterruptedException {
+    Priority priority1 = Priority.newInstance(1);
+    Priority priority2 = Priority.newInstance(2);
+
+    String [] hosts = new String[] {HOST1, HOST2};
+
+    String [] hostsH1 = new String[] {HOST1};
+
+    TestTaskSchedulerServiceWrapper tsWrapper = new 
TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l);
+
+    // Fill up host1 with p2 tasks.
+    // Leave host2 empty
+    // Try running both p1 tasks on host1.
+    // R: Single preemption triggered, followed by allocation, followed by 
another preemption.
+    //
+
+    try {
+
+      Object task1 = "task1";
+      Object clientCookie1 = "cookie1";
+      Object task2 = "task2";
+      Object clientCookie2 = "cookie2";
+      Object task3 = "task3";
+      Object clientCookie3 = "cookie3";
+      Object task4 = "task4";
+      Object clientCookie4 = "cookie4";
+
+      tsWrapper.controlScheduler(true);
+      tsWrapper.allocateTask(task1, hostsH1, priority2, clientCookie1);
+      tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2);
+
+      tsWrapper.awaitLocalTaskAllocations(2);
+      verify(tsWrapper.mockAppCallback, 
never()).preemptContainer(any(ContainerId.class));
+      ArgumentCaptor<Object> argumentCaptor = 
ArgumentCaptor.forClass(Object.class);
+      ArgumentCaptor<Container> cArgCaptor = 
ArgumentCaptor.forClass(Container.class);
+      verify(tsWrapper.mockAppCallback, times(2))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), 
cArgCaptor.capture());
+      assertEquals(2, argumentCaptor.getAllValues().size());
+      assertEquals(task1, argumentCaptor.getAllValues().get(0));
+      assertEquals(task2, argumentCaptor.getAllValues().get(1));
+      assertEquals(2, cArgCaptor.getAllValues().size());
+      ContainerId t1CId = cArgCaptor.getAllValues().get(0).getId();
+
+      reset(tsWrapper.mockAppCallback);
+      // At this point. 2 tasks running - both at priority 2.
+      // Try running a priority 1 task
+      tsWrapper.allocateTask(task3, hostsH1, priority1, clientCookie3);
+      tsWrapper.allocateTask(task4, hostsH1, priority1, clientCookie4);
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numPreemptedTasks == 1) {
+          break;
+        }
+      }
+      ArgumentCaptor<ContainerId> cIdArgCaptor = 
ArgumentCaptor.forClass(ContainerId.class);
+      
verify(tsWrapper.mockAppCallback).preemptContainer(cIdArgCaptor.capture());
+
+      // Determin which task has been preempted. Normally task2 would be 
preempted based on it starting
+      // later. However - both may have the same start time, so either could 
be picked.
+      Object deallocatedTask1; // De-allocated now
+      Object deallocatedTask2; // Will be de-allocated later.
+      if (cIdArgCaptor.getValue().equals(t1CId)) {
+        deallocatedTask1 = task1;
+        deallocatedTask2 = task2;
+      } else {
+        deallocatedTask1 = task2;
+        deallocatedTask2 = task1;
+      }
+
+      tsWrapper.deallocateTask(deallocatedTask1, false, 
TaskAttemptEndReason.INTERNAL_PREEMPTION);
+
+      tsWrapper.awaitLocalTaskAllocations(3);
+
+      verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task3),
+          eq(clientCookie3), any(Container.class));
+
+      // At this point. one p=2 task and task3(p=1) running. Ask for another 
p1 task.
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numPreemptedTasks == 2) {
+          break;
+        }
+      }
+      verify(tsWrapper.mockAppCallback, 
times(2)).preemptContainer(any(ContainerId.class));
+
+      tsWrapper.deallocateTask(deallocatedTask2, false, 
TaskAttemptEndReason.INTERNAL_PREEMPTION);
+
+      tsWrapper.awaitLocalTaskAllocations(4);
+
+      verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4),
+          eq(clientCookie4), any(Container.class));
+
+
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
+  @Test(timeout = 10000)
   public void testForcedLocalityNotInDelayedQueue() throws IOException, 
InterruptedException {
     String[] hosts = new String[]{HOST1, HOST2};
 

Reply via email to