Repository: tez
Updated Branches:
refs/heads/branch-0.5 d78ca0607 -> b88feb9d8
TEZ-2816. Preemption sometimes does not respect heartbeats between preemptions
(bikas)
(cherry picked from commit a06cd76d8ddcee5f7f939cf72e3eeb3cc59033d0)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b88feb9d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b88feb9d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b88feb9d
Branch: refs/heads/branch-0.5
Commit: b88feb9d877ad777f452b453b4e637c94e981ab2
Parents: d78ca06
Author: Bikas Saha <[email protected]>
Authored: Fri Sep 18 14:55:27 2015 -0700
Committer: Bikas Saha <[email protected]>
Committed: Fri Sep 18 15:25:42 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../dag/app/rm/YarnTaskSchedulerService.java | 42 +++++++++++---------
.../tez/dag/app/rm/TestTaskScheduler.java | 36 +++++++++++++----
3 files changed, 53 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/b88feb9d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index aa94ae2..766f019 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
ALL CHANGES:
+ TEZ-2812. Preemption sometimes does not respect heartbeats between
preemptions
TEZ-814. Improve heuristic for determining a task has failed outputs
TEZ-2768. Log a useful error message when the summary stream cannot be
closed when shutting
down an AM.
http://git-wip-us.apache.org/repos/asf/tez/blob/b88feb9d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index a2798de..1ed3d02 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -890,8 +890,12 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
" taskAllocations: " + taskAllocations.size());
}
- numHeartbeats++;
- preemptIfNeeded();
+ synchronized (this) {
+ numHeartbeats++;
+ if (preemptIfNeeded()) {
+ heartbeatAtLastPreemption = numHeartbeats;
+ }
+ }
return appClientDelegate.getProgress();
}
@@ -1109,10 +1113,10 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
return (int) Math.ceil((original * percent)/100.f);
}
- void preemptIfNeeded() {
+ boolean preemptIfNeeded() {
if (preemptionPercentage == 0) {
// turned off
- return;
+ return true;
}
ContainerId[] preemptedContainers = null;
int numPendingRequestsToService = 0;
@@ -1143,7 +1147,7 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
if (highestPriRequest == null) {
// nothing pending
- return;
+ return true;
}
if(fitsIn(highestPriRequest.getCapability(), freeResources)) {
@@ -1151,7 +1155,7 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
LOG.debug("Highest pri request: " + highestPriRequest + " fits in
available resources "
+ freeResources);
}
- return;
+ return true;
}
// highest priority request will not fit in existing free resources
// free up some more
@@ -1161,7 +1165,8 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
preemptionPercentage);
if (numPendingRequestsToService < 1) {
- return;
+ // nothing to preempt - reset preemption last heartbeat
+ return true;
}
if (LOG.isDebugEnabled()) {
@@ -1169,7 +1174,7 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
+ numHighestPriRequests + " pending requests at pri: "
+ highestPriRequest.getPriority());
}
-
+
for (int i=0; i<numPendingRequestsToService; ++i) {
// This request must have been considered for matching with all
existing
// containers when request was made.
@@ -1182,7 +1187,7 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
LOG.debug("Reused container exists. Wait for assignment loop to
release it. "
+ heldContainer.getContainer().getId());
}
- return;
+ return true;
}
if (heldContainer.geNumAssignmentAttempts() < 3) {
// we havent tried to assign this container at node/rack/ANY
@@ -1190,7 +1195,7 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
LOG.debug("Brand new container. Wait for assignment loop to
match it. "
+ heldContainer.getContainer().getId());
}
- return;
+ return true;
}
Container container = heldContainer.getContainer();
if (lowestPriNewContainer == null ||
@@ -1235,18 +1240,14 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
}
if (numPendingRequestsToService < 1) {
- return;
+ return true;
}
// there are no reused or new containers to release. try to preempt
running containers
// this assert will be a no-op in production but can help identify
// invalid assumptions during testing
assert delayedContainerManager.delayedContainers.isEmpty();
-
- if ((numHeartbeats - heartbeatAtLastPreemption) <=
numHeartbeatsBetweenPreemptions) {
- return;
- }
-
+
Priority preemptedTaskPriority = null;
int numEntriesAtPreemptedPriority = 0;
for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
@@ -1282,7 +1283,12 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
numPendingRequestsToService = Math.min(newNumPendingRequestsToService,
numPendingRequestsToService);
if (numPendingRequestsToService < 1) {
- return;
+ return true;
+ }
+ // wait for enough heartbeats since this request became active for
preemption
+ if ((numHeartbeats - heartbeatAtLastPreemption) <
numHeartbeatsBetweenPreemptions) {
+ // stop incrementing lastpreemption heartbeat count
+ return false;
}
LOG.info("Trying to service " + numPendingRequestsToService + " out of
total "
+ numHighestPriRequests + " pending requests at pri: "
@@ -1309,7 +1315,6 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
// upcall outside locks
if (preemptedContainers != null) {
- heartbeatAtLastPreemption = numHeartbeats;
for(int i=0; i<numPendingRequestsToService; ++i) {
ContainerId cId = preemptedContainers[i];
if (cId != null) {
@@ -1318,6 +1323,7 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
}
}
}
+ return true;
}
private boolean fitsIn(Resource toFit, Resource resource) {
http://git-wip-us.apache.org/repos/asf/tez/blob/b88feb9d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index e61eed6..4b1fb18 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -1498,6 +1498,7 @@ public class TestTaskScheduler {
Configuration conf = new Configuration();
conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
scheduler.init(conf);
+
conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS,
3);
RegisterApplicationMasterResponse mockRegResponse =
mock(RegisterApplicationMasterResponse.class);
@@ -1673,7 +1674,10 @@ public class TestTaskScheduler {
scheduler.getProgress();
drainableAppCallback.drain();
verify(mockRMClient,
times(0)).releaseAssignedContainer((ContainerId)any());
+ // no need for task preemption until now - so they should match
+ Assert.assertEquals(scheduler.numHeartbeats,
scheduler.heartbeatAtLastPreemption);
+ // add a pending request that cannot be allocated until resources free up
Object mockTask3WaitCookie = new Object();
scheduler.allocateTask(mockTask3Wait, taskAsk, null,
null, pri6, obj3, mockTask3WaitCookie);
@@ -1692,6 +1696,7 @@ public class TestTaskScheduler {
containers.clear();
containers.add(mockContainer4);
+ // new lower pri container added that wont be matched and eventually
preempted
// Fudge new container being present in delayed allocation list due to race
HeldContainer heldContainer = new HeldContainer(mockContainer4, -1, -1,
null,
containerSignatureMatcher);
@@ -1700,6 +1705,9 @@ public class TestTaskScheduler {
scheduler.getProgress();
drainableAppCallback.drain();
verify(mockRMClient,
times(0)).releaseAssignedContainer((ContainerId)any());
+ // no need for task preemption until now - so they should match
+ Assert.assertEquals(scheduler.numHeartbeats,
scheduler.heartbeatAtLastPreemption);
+
heldContainer.incrementAssignmentAttempts();
// no preemption - container assignment attempts < 3
scheduler.getProgress();
@@ -1721,12 +1729,18 @@ public class TestTaskScheduler {
// remove fudging.
scheduler.delayedContainerManager.delayedContainers.clear();
-
+
+ // no need for task preemption until now - so they should match
+ Assert.assertEquals(scheduler.numHeartbeats,
scheduler.heartbeatAtLastPreemption);
+
scheduler.allocateTask(mockTask3Retry, taskAsk, null,
null, pri5, obj3, null);
// no preemption - higher pri. exact match
scheduler.getProgress();
+ // no need for task preemption until now - so they should match
drainableAppCallback.drain();
+ // no need for task preemption until now - so they should match
+ Assert.assertEquals(scheduler.numHeartbeats,
scheduler.heartbeatAtLastPreemption);
verify(mockRMClient,
times(1)).releaseAssignedContainer((ContainerId)any());
for (int i=0; i<11; ++i) {
@@ -1739,16 +1753,22 @@ public class TestTaskScheduler {
// this is also a higher priority container than the pending task priority
but was running a
// lower priority task. Task priority is relevant for preemption and not
container priority as
// containers can run tasks of different priorities
- scheduler.getProgress();
+ scheduler.getProgress(); // first heartbeat
+ Assert.assertTrue(scheduler.numHeartbeats >
scheduler.heartbeatAtLastPreemption);
+ drainableAppCallback.drain();
+ scheduler.getProgress(); // second heartbeat
+ drainableAppCallback.drain();
+ verify(mockRMClient,
times(1)).releaseAssignedContainer((ContainerId)any());
+ scheduler.getProgress(); // third heartbeat
drainableAppCallback.drain();
verify(mockRMClient,
times(2)).releaseAssignedContainer((ContainerId)any());
verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId3B);
- // next 3 heartbeats do nothing, waiting for the RM to act on the last
released resources
- scheduler.getProgress();
- scheduler.getProgress();
- scheduler.getProgress();
- verify(mockRMClient,
times(2)).releaseAssignedContainer((ContainerId)any());
- scheduler.getProgress();
+ Assert.assertEquals(scheduler.numHeartbeats,
scheduler.heartbeatAtLastPreemption);
+ // there are pending preemptions.
+ scheduler.getProgress(); // first heartbeat
+ scheduler.getProgress(); // second heartbeat
+ verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)
any());
+ scheduler.getProgress(); // third heartbeat
drainableAppCallback.drain();
// Next oldest mockTaskPri3KillA gets preempted to clear 10% of
outstanding running preemptable tasks
verify(mockRMClient,
times(3)).releaseAssignedContainer((ContainerId)any());