Repository: tez
Updated Branches:
  refs/heads/branch-0.6 b16b9b1fe -> 7f6b1941e


TEZ-2816. Preemption sometimes does not respect heartbeats between preemptions 
(bikas)
(cherry picked from commit a06cd76d8ddcee5f7f939cf72e3eeb3cc59033d0)

Conflicts:
        CHANGES.txt
        
tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7f6b1941
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7f6b1941
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7f6b1941

Branch: refs/heads/branch-0.6
Commit: 7f6b1941ee2e631b479b598018e96badac6162f9
Parents: b16b9b1
Author: Bikas Saha <[email protected]>
Authored: Fri Sep 18 14:55:27 2015 -0700
Committer: Bikas Saha <[email protected]>
Committed: Fri Sep 18 15:19:59 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 .../dag/app/rm/YarnTaskSchedulerService.java    | 42 +++++++++++---------
 .../tez/dag/app/rm/TestTaskScheduler.java       | 36 +++++++++++++----
 3 files changed, 55 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/7f6b1941/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 83894a4..45c7a61 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2812. Preemption sometimes does not respect heartbeats between 
preemptions
+  TEZ-2097. TEZ-UI Add dag logs backend support
   TEZ-814.  Improve heuristic for determining a task has failed outputs
   TEZ-2809. Minimal distribution compiled on 2.6 fails to run on 2.7
   TEZ-2768. Log a useful error message when the summary stream cannot be 
closed when shutting
@@ -237,6 +239,7 @@ INCOMPATIBLE CHANGES
   TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
 
 ALL CHANGES:
+  TEZ-2812. Preemption sometimes does not respect heartbeats between 
preemptions
   TEZ-814.  Improve heuristic for determining a task has failed outputs
   TEZ-2768. Log a useful error message when the summary stream cannot be 
closed when shutting
   down an AM.

http://git-wip-us.apache.org/repos/asf/tez/blob/7f6b1941/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 3929350..129f043 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -897,8 +897,12 @@ public class YarnTaskSchedulerService extends 
TaskSchedulerService
                " taskAllocations: " + taskAllocations.size());
     }
 
-    numHeartbeats++;
-    preemptIfNeeded();
+    synchronized (this) {
+      numHeartbeats++;
+      if (preemptIfNeeded()) {
+        heartbeatAtLastPreemption = numHeartbeats;
+      }
+    }
 
     return appClientDelegate.getProgress();
   }
@@ -1116,10 +1120,10 @@ public class YarnTaskSchedulerService extends 
TaskSchedulerService
     return (int) Math.ceil((original * percent)/100.f);
   }
   
-  void preemptIfNeeded() {
+  boolean preemptIfNeeded() {
     if (preemptionPercentage == 0) {
       // turned off
-      return;
+      return true;
     }
     ContainerId[] preemptedContainers = null;
     int numPendingRequestsToService = 0;
@@ -1150,7 +1154,7 @@ public class YarnTaskSchedulerService extends 
TaskSchedulerService
       
       if (highestPriRequest == null) {
         // nothing pending
-        return;
+        return true;
       }
       
       if(fitsIn(highestPriRequest.getCapability(), freeResources)) {
@@ -1158,7 +1162,7 @@ public class YarnTaskSchedulerService extends 
TaskSchedulerService
           LOG.debug("Highest pri request: " + highestPriRequest + " fits in 
available resources "
               + freeResources);
         }
-        return;
+        return true;
       }
       // highest priority request will not fit in existing free resources
       // free up some more
@@ -1168,7 +1172,8 @@ public class YarnTaskSchedulerService extends 
TaskSchedulerService
           preemptionPercentage);
 
       if (numPendingRequestsToService < 1) {
-        return;
+        // nothing to preempt - reset preemption last heartbeat
+        return true;
       }
 
       if (LOG.isDebugEnabled()) {
@@ -1176,7 +1181,7 @@ public class YarnTaskSchedulerService extends 
TaskSchedulerService
             + numHighestPriRequests + " pending requests at pri: "
             + highestPriRequest.getPriority());
       }
-      
+
       for (int i=0; i<numPendingRequestsToService; ++i) {
         // This request must have been considered for matching with all 
existing 
         // containers when request was made.
@@ -1189,7 +1194,7 @@ public class YarnTaskSchedulerService extends 
TaskSchedulerService
               LOG.debug("Reused container exists. Wait for assignment loop to 
release it. "
                   + heldContainer.getContainer().getId());
             }
-            return;
+            return true;
           }
           if (heldContainer.geNumAssignmentAttempts() < 3) {
             // we havent tried to assign this container at node/rack/ANY
@@ -1197,7 +1202,7 @@ public class YarnTaskSchedulerService extends 
TaskSchedulerService
               LOG.debug("Brand new container. Wait for assignment loop to 
match it. "
                   + heldContainer.getContainer().getId());
             }
-            return;
+            return true;
           }
           Container container = heldContainer.getContainer();
           if (lowestPriNewContainer == null ||
@@ -1242,18 +1247,14 @@ public class YarnTaskSchedulerService extends 
TaskSchedulerService
       }
       
       if (numPendingRequestsToService < 1) {
-        return;
+        return true;
       }
 
       // there are no reused or new containers to release. try to preempt 
running containers
       // this assert will be a no-op in production but can help identify 
       // invalid assumptions during testing
       assert delayedContainerManager.delayedContainers.isEmpty();
-      
-      if ((numHeartbeats - heartbeatAtLastPreemption) <= 
numHeartbeatsBetweenPreemptions) {
-        return;
-      }
-        
+              
       Priority preemptedTaskPriority = null;
       int numEntriesAtPreemptedPriority = 0;
       for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
@@ -1289,7 +1290,12 @@ public class YarnTaskSchedulerService extends 
TaskSchedulerService
         numPendingRequestsToService = Math.min(newNumPendingRequestsToService,
             numPendingRequestsToService);
         if (numPendingRequestsToService < 1) {
-          return;
+          return true;
+        }
+        // wait for enough heartbeats since this request became active for 
preemption
+        if ((numHeartbeats - heartbeatAtLastPreemption) < 
numHeartbeatsBetweenPreemptions) {
+          // stop incrementing lastpreemption heartbeat count
+          return false;
         }
         LOG.info("Trying to service " + numPendingRequestsToService + " out of 
total "
             + numHighestPriRequests + " pending requests at pri: "
@@ -1316,7 +1322,6 @@ public class YarnTaskSchedulerService extends 
TaskSchedulerService
     
     // upcall outside locks
     if (preemptedContainers != null) {
-      heartbeatAtLastPreemption = numHeartbeats;
       for(int i=0; i<numPendingRequestsToService; ++i) {
         ContainerId cId = preemptedContainers[i];
         if (cId != null) {
@@ -1325,6 +1330,7 @@ public class YarnTaskSchedulerService extends 
TaskSchedulerService
         }
       }
     }
+    return true;
   }
 
   private boolean fitsIn(Resource toFit, Resource resource) {

http://git-wip-us.apache.org/repos/asf/tez/blob/7f6b1941/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 7e2c674..7286d79 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -1505,6 +1505,7 @@ public class TestTaskScheduler {
     Configuration conf = new Configuration();
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
     scheduler.init(conf);
+    
conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS, 
3);
 
     RegisterApplicationMasterResponse mockRegResponse =
                        mock(RegisterApplicationMasterResponse.class);
@@ -1680,7 +1681,10 @@ public class TestTaskScheduler {
     scheduler.getProgress();
     drainableAppCallback.drain();
     verify(mockRMClient, 
times(0)).releaseAssignedContainer((ContainerId)any());
+    // no need for task preemption until now - so they should match
+    Assert.assertEquals(scheduler.numHeartbeats, 
scheduler.heartbeatAtLastPreemption);
 
+    // add a pending request that cannot be allocated until resources free up
     Object mockTask3WaitCookie = new Object();
     scheduler.allocateTask(mockTask3Wait, taskAsk, null,
                            null, pri6, obj3, mockTask3WaitCookie);
@@ -1699,6 +1703,7 @@ public class TestTaskScheduler {
     containers.clear();
     containers.add(mockContainer4);
     
+    // new lower pri container added that wont be matched and eventually 
preempted
     // Fudge new container being present in delayed allocation list due to race
     HeldContainer heldContainer = new HeldContainer(mockContainer4, -1, -1, 
null,
         containerSignatureMatcher);
@@ -1707,6 +1712,9 @@ public class TestTaskScheduler {
     scheduler.getProgress();
     drainableAppCallback.drain();
     verify(mockRMClient, 
times(0)).releaseAssignedContainer((ContainerId)any());
+    // no need for task preemption until now - so they should match
+    Assert.assertEquals(scheduler.numHeartbeats, 
scheduler.heartbeatAtLastPreemption);
+
     heldContainer.incrementAssignmentAttempts();
     // no preemption - container assignment attempts < 3
     scheduler.getProgress();
@@ -1728,12 +1736,18 @@ public class TestTaskScheduler {
     
     // remove fudging.
     scheduler.delayedContainerManager.delayedContainers.clear();
-    
+
+    // no need for task preemption until now - so they should match
+    Assert.assertEquals(scheduler.numHeartbeats, 
scheduler.heartbeatAtLastPreemption);
+
     scheduler.allocateTask(mockTask3Retry, taskAsk, null,
                            null, pri5, obj3, null);
     // no preemption - higher pri. exact match
     scheduler.getProgress();
+    // no need for task preemption until now - so they should match
     drainableAppCallback.drain();
+    // no need for task preemption until now - so they should match
+    Assert.assertEquals(scheduler.numHeartbeats, 
scheduler.heartbeatAtLastPreemption);
     verify(mockRMClient, 
times(1)).releaseAssignedContainer((ContainerId)any());
 
     for (int i=0; i<11; ++i) {
@@ -1746,16 +1760,22 @@ public class TestTaskScheduler {
     // this is also a higher priority container than the pending task priority 
but was running a 
     // lower priority task. Task priority is relevant for preemption and not 
container priority as
     // containers can run tasks of different priorities
-    scheduler.getProgress();
+    scheduler.getProgress(); // first heartbeat
+    Assert.assertTrue(scheduler.numHeartbeats > 
scheduler.heartbeatAtLastPreemption);
+    drainableAppCallback.drain();
+    scheduler.getProgress(); // second heartbeat
+    drainableAppCallback.drain();
+    verify(mockRMClient, 
times(1)).releaseAssignedContainer((ContainerId)any());
+    scheduler.getProgress(); // third heartbeat
     drainableAppCallback.drain();
     verify(mockRMClient, 
times(2)).releaseAssignedContainer((ContainerId)any());
     verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId3B);
-    // next 3 heartbeats do nothing, waiting for the RM to act on the last 
released resources
-    scheduler.getProgress();
-    scheduler.getProgress();
-    scheduler.getProgress();
-    verify(mockRMClient, 
times(2)).releaseAssignedContainer((ContainerId)any());
-    scheduler.getProgress();
+    Assert.assertEquals(scheduler.numHeartbeats, 
scheduler.heartbeatAtLastPreemption);
+    // there are pending preemptions.
+    scheduler.getProgress(); // first heartbeat
+    scheduler.getProgress(); // second heartbeat
+    verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId) 
any());
+    scheduler.getProgress(); // third heartbeat
     drainableAppCallback.drain();
     // Next oldest mockTaskPri3KillA gets preempted to clear 10% of 
outstanding running preemptable tasks
     verify(mockRMClient, 
times(3)).releaseAssignedContainer((ContainerId)any());

Reply via email to