Repository: tez
Updated Branches:
refs/heads/branch-0.7 b939cc38b -> 0ce51cc6a
TEZ-2816. Preemption sometimes does not respect heartbeats between preemptions
(bikas)
(cherry picked from commit a06cd76d8ddcee5f7f939cf72e3eeb3cc59033d0)
Conflicts:
CHANGES.txt
tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0ce51cc6
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0ce51cc6
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0ce51cc6
Branch: refs/heads/branch-0.7
Commit: 0ce51cc6a8571be19ea675187c3ca44d81930865
Parents: b939cc3
Author: Bikas Saha <[email protected]>
Authored: Fri Sep 18 14:55:27 2015 -0700
Committer: Bikas Saha <[email protected]>
Committed: Fri Sep 18 15:11:57 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 6 +++
.../dag/app/rm/YarnTaskSchedulerService.java | 42 +++++++++++---------
.../tez/dag/app/rm/TestTaskScheduler.java | 36 +++++++++++++----
3 files changed, 58 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/0ce51cc6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5565d47..34e62c9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2812. Preemption sometimes does not respect heartbeats between
preemptions
TEZ-2097. TEZ-UI Add dag logs backend support
TEZ-814. Improve heuristic for determining a task has failed outputs
TEZ-2840. MRInputLegacy.init should set splitInfoViaEvents.
@@ -269,7 +270,11 @@ Release 0.6.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+<<<<<<< HEAD
TEZ-2097. TEZ-UI Add dag logs backend support
+=======
+ TEZ-2812. Preemption sometimes does not respect heartbeats between
preemptions
+>>>>>>> TEZ-2816. Preemption sometimes does not respect heartbeats between
preemptions (bikas)
TEZ-814. Improve heuristic for determining a task has failed outputs
TEZ-2809. Minimal distribution compiled on 2.6 fails to run on 2.7
TEZ-2768. Log a useful error message when the summary stream cannot be
closed when shutting
@@ -491,6 +496,7 @@ INCOMPATIBLE CHANGES
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
ALL CHANGES:
+ TEZ-2812. Preemption sometimes does not respect heartbeats between
preemptions
TEZ-814. Improve heuristic for determining a task has failed outputs
TEZ-2768. Log a useful error message when the summary stream cannot be
closed when shutting
down an AM.
http://git-wip-us.apache.org/repos/asf/tez/blob/0ce51cc6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index f35a45f..34684f9 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -910,8 +910,12 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
" taskAllocations: " + taskAllocations.size());
}
- numHeartbeats++;
- preemptIfNeeded();
+ synchronized (this) {
+ numHeartbeats++;
+ if (preemptIfNeeded()) {
+ heartbeatAtLastPreemption = numHeartbeats;
+ }
+ }
return appClientDelegate.getProgress();
}
@@ -1141,10 +1145,10 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
" heartbeats: " + numHeartbeats + " lastPreemptionHeartbeat: " +
heartbeatAtLastPreemption;
}
- void preemptIfNeeded() {
+ boolean preemptIfNeeded() {
if (preemptionPercentage == 0) {
// turned off
- return;
+ return true;
}
ContainerId[] preemptedContainers = null;
int numPendingRequestsToService = 0;
@@ -1176,7 +1180,7 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
if (highestPriRequest == null) {
// nothing pending
- return;
+ return true;
}
if(fitsIn(highestPriRequest.getCapability(), freeResources)) {
@@ -1187,7 +1191,7 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
LOG.info(highestPriRequest + " fits in free resources");
}
}
- return;
+ return true;
}
// highest priority request will not fit in existing free resources
// free up some more
@@ -1197,7 +1201,8 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
preemptionPercentage);
if (numPendingRequestsToService < 1) {
- return;
+ // nothing to preempt - reset preemption last heartbeat
+ return true;
}
if (LOG.isDebugEnabled()) {
@@ -1205,7 +1210,7 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
+ numHighestPriRequests + " pending requests at pri: "
+ highestPriRequest.getPriority());
}
-
+
for (int i=0; i<numPendingRequestsToService; ++i) {
// This request must have been considered for matching with all
existing
// containers when request was made.
@@ -1218,7 +1223,7 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
LOG.debug("Reused container exists. Wait for assignment loop to
release it. "
+ heldContainer.getContainer().getId());
}
- return;
+ return true;
}
if (heldContainer.geNumAssignmentAttempts() < 3) {
// we havent tried to assign this container at node/rack/ANY
@@ -1226,7 +1231,7 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
LOG.debug("Brand new container. Wait for assignment loop to
match it. "
+ heldContainer.getContainer().getId());
}
- return;
+ return true;
}
Container container = heldContainer.getContainer();
if (lowestPriNewContainer == null ||
@@ -1271,18 +1276,14 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
}
if (numPendingRequestsToService < 1) {
- return;
+ return true;
}
// there are no reused or new containers to release. try to preempt
running containers
// this assert will be a no-op in production but can help identify
// invalid assumptions during testing
assert delayedContainerManager.delayedContainers.isEmpty();
-
- if ((numHeartbeats - heartbeatAtLastPreemption) <=
numHeartbeatsBetweenPreemptions) {
- return;
- }
-
+
Priority preemptedTaskPriority = null;
int numEntriesAtPreemptedPriority = 0;
for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
@@ -1318,7 +1319,12 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
numPendingRequestsToService = Math.min(newNumPendingRequestsToService,
numPendingRequestsToService);
if (numPendingRequestsToService < 1) {
- return;
+ return true;
+ }
+ // wait for enough heartbeats since this request became active for
preemption
+ if ((numHeartbeats - heartbeatAtLastPreemption) <
numHeartbeatsBetweenPreemptions) {
+ // stop incrementing lastpreemption heartbeat count
+ return false;
}
LOG.info("Trying to service " + numPendingRequestsToService + " out of
total "
+ numHighestPriRequests + " pending requests at pri: "
@@ -1345,7 +1351,6 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
// upcall outside locks
if (preemptedContainers != null) {
- heartbeatAtLastPreemption = numHeartbeats;
for(int i=0; i<numPendingRequestsToService; ++i) {
ContainerId cId = preemptedContainers[i];
if (cId != null) {
@@ -1354,6 +1359,7 @@ public class YarnTaskSchedulerService extends
TaskSchedulerService
}
}
}
+ return true;
}
private boolean fitsIn(Resource toFit, Resource resource) {
http://git-wip-us.apache.org/repos/asf/tez/blob/0ce51cc6/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 7e2c674..7286d79 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -1505,6 +1505,7 @@ public class TestTaskScheduler {
Configuration conf = new Configuration();
conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
scheduler.init(conf);
+
conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS,
3);
RegisterApplicationMasterResponse mockRegResponse =
mock(RegisterApplicationMasterResponse.class);
@@ -1680,7 +1681,10 @@ public class TestTaskScheduler {
scheduler.getProgress();
drainableAppCallback.drain();
verify(mockRMClient,
times(0)).releaseAssignedContainer((ContainerId)any());
+ // no need for task preemption until now - so they should match
+ Assert.assertEquals(scheduler.numHeartbeats,
scheduler.heartbeatAtLastPreemption);
+ // add a pending request that cannot be allocated until resources free up
Object mockTask3WaitCookie = new Object();
scheduler.allocateTask(mockTask3Wait, taskAsk, null,
null, pri6, obj3, mockTask3WaitCookie);
@@ -1699,6 +1703,7 @@ public class TestTaskScheduler {
containers.clear();
containers.add(mockContainer4);
+ // new lower pri container added that wont be matched and eventually
preempted
// Fudge new container being present in delayed allocation list due to race
HeldContainer heldContainer = new HeldContainer(mockContainer4, -1, -1,
null,
containerSignatureMatcher);
@@ -1707,6 +1712,9 @@ public class TestTaskScheduler {
scheduler.getProgress();
drainableAppCallback.drain();
verify(mockRMClient,
times(0)).releaseAssignedContainer((ContainerId)any());
+ // no need for task preemption until now - so they should match
+ Assert.assertEquals(scheduler.numHeartbeats,
scheduler.heartbeatAtLastPreemption);
+
heldContainer.incrementAssignmentAttempts();
// no preemption - container assignment attempts < 3
scheduler.getProgress();
@@ -1728,12 +1736,18 @@ public class TestTaskScheduler {
// remove fudging.
scheduler.delayedContainerManager.delayedContainers.clear();
-
+
+ // no need for task preemption until now - so they should match
+ Assert.assertEquals(scheduler.numHeartbeats,
scheduler.heartbeatAtLastPreemption);
+
scheduler.allocateTask(mockTask3Retry, taskAsk, null,
null, pri5, obj3, null);
// no preemption - higher pri. exact match
scheduler.getProgress();
+ // no need for task preemption until now - so they should match
drainableAppCallback.drain();
+ // no need for task preemption until now - so they should match
+ Assert.assertEquals(scheduler.numHeartbeats,
scheduler.heartbeatAtLastPreemption);
verify(mockRMClient,
times(1)).releaseAssignedContainer((ContainerId)any());
for (int i=0; i<11; ++i) {
@@ -1746,16 +1760,22 @@ public class TestTaskScheduler {
// this is also a higher priority container than the pending task priority
but was running a
// lower priority task. Task priority is relevant for preemption and not
container priority as
// containers can run tasks of different priorities
- scheduler.getProgress();
+ scheduler.getProgress(); // first heartbeat
+ Assert.assertTrue(scheduler.numHeartbeats >
scheduler.heartbeatAtLastPreemption);
+ drainableAppCallback.drain();
+ scheduler.getProgress(); // second heartbeat
+ drainableAppCallback.drain();
+ verify(mockRMClient,
times(1)).releaseAssignedContainer((ContainerId)any());
+ scheduler.getProgress(); // third heartbeat
drainableAppCallback.drain();
verify(mockRMClient,
times(2)).releaseAssignedContainer((ContainerId)any());
verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId3B);
- // next 3 heartbeats do nothing, waiting for the RM to act on the last
released resources
- scheduler.getProgress();
- scheduler.getProgress();
- scheduler.getProgress();
- verify(mockRMClient,
times(2)).releaseAssignedContainer((ContainerId)any());
- scheduler.getProgress();
+ Assert.assertEquals(scheduler.numHeartbeats,
scheduler.heartbeatAtLastPreemption);
+ // there are pending preemptions.
+ scheduler.getProgress(); // first heartbeat
+ scheduler.getProgress(); // second heartbeat
+ verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)
any());
+ scheduler.getProgress(); // third heartbeat
drainableAppCallback.drain();
// Next oldest mockTaskPri3KillA gets preempted to clear 10% of
outstanding running preemptable tasks
verify(mockRMClient,
times(3)).releaseAssignedContainer((ContainerId)any());