Repository: tez Updated Branches: refs/heads/master 04571cc53 -> a06cd76d8
TEZ-2816. Preemption sometimes does not respect heartbeats between preemptions (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a06cd76d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a06cd76d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a06cd76d Branch: refs/heads/master Commit: a06cd76d8ddcee5f7f939cf72e3eeb3cc59033d0 Parents: 04571cc Author: Bikas Saha <[email protected]> Authored: Fri Sep 18 14:55:27 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Fri Sep 18 14:55:27 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 4 +++ .../dag/app/rm/YarnTaskSchedulerService.java | 38 +++++++++++--------- .../tez/dag/app/rm/TestTaskScheduler.java | 34 ++++++++++++++---- .../org/apache/tez/analyzer/TestAnalyzer.java | 4 +-- 4 files changed, 54 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a06cd76d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9c34cca..b4a5db4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.8.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions TEZ-814. Improve heuristic for determining a task has failed outputs TEZ-2832. Support tests for both SimpleHistory logging and ATS logging TEZ-2840. MRInputLegacy.init should set splitInfoViaEvents. @@ -184,6 +185,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions TEZ-814. Improve heuristic for determining a task has failed outputs TEZ-2840. MRInputLegacy.init should set splitInfoViaEvents. TEZ-2829. Tez UI: minor fixes to in-progress update of UI from AM @@ -441,6 +443,7 @@ Release 0.6.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions TEZ-814. Improve heuristic for determining a task has failed outputs TEZ-2809. Minimal distribution compiled on 2.6 fails to run on 2.7 TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting @@ -662,6 +665,7 @@ INCOMPATIBLE CHANGES TEZ-2552. CRC errors can cause job to run for very long time in large jobs. ALL CHANGES: + TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions TEZ-814. Improve heuristic for determining a task has failed outputs TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting down an AM. http://git-wip-us.apache.org/repos/asf/tez/blob/a06cd76d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index 3c1cf3f..5a5464f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -892,7 +892,9 @@ public class YarnTaskSchedulerService extends TaskScheduler synchronized (this) { numHeartbeats++; - preemptIfNeeded(); + if (preemptIfNeeded()) { + heartbeatAtLastPreemption = numHeartbeats; + } } return getContext().getProgress(); @@ -1127,10 +1129,10 @@ public class YarnTaskSchedulerService extends TaskScheduler " heartbeats: " + numHeartbeats + " lastPreemptionHeartbeat: " + heartbeatAtLastPreemption; } - void preemptIfNeeded() { + boolean preemptIfNeeded() { if (preemptionPercentage == 0) { // turned off - return; + return true; } ContainerId[] preemptedContainers = null; int numPendingRequestsToService = 0; @@ -1162,7 +1164,7 @@ public class YarnTaskSchedulerService extends TaskScheduler if (highestPriRequest == null) { // nothing pending - return; + return true; } if(fitsIn(highestPriRequest.getCapability(), freeResources)) { @@ -1173,7 +1175,7 @@ public class YarnTaskSchedulerService extends TaskScheduler LOG.info(highestPriRequest + " fits in free resources"); } } - return; + return true; } // highest priority request will not fit in existing free resources // free up some more @@ -1183,7 +1185,8 @@ public class YarnTaskSchedulerService extends TaskScheduler preemptionPercentage); if (numPendingRequestsToService < 1) { - return; + // nothing to preempt - reset preemption last heartbeat + return true; } if (LOG.isDebugEnabled()) { @@ -1191,7 +1194,7 @@ public class YarnTaskSchedulerService extends TaskScheduler + numHighestPriRequests + " pending requests at pri: " + highestPriRequest.getPriority()); } - + for (int i=0; i<numPendingRequestsToService; ++i) { // This request must have been considered for matching with all existing // containers when request was made. @@ -1204,7 +1207,7 @@ public class YarnTaskSchedulerService extends TaskScheduler LOG.debug("Reused container exists. Wait for assignment loop to release it. " + heldContainer.getContainer().getId()); } - return; + return true; } if (heldContainer.geNumAssignmentAttempts() < 3) { // we havent tried to assign this container at node/rack/ANY @@ -1212,7 +1215,7 @@ public class YarnTaskSchedulerService extends TaskScheduler LOG.debug("Brand new container. Wait for assignment loop to match it. " + heldContainer.getContainer().getId()); } - return; + return true; } Container container = heldContainer.getContainer(); if (lowestPriNewContainer == null || @@ -1257,18 +1260,14 @@ public class YarnTaskSchedulerService extends TaskScheduler } if (numPendingRequestsToService < 1) { - return; + return true; } // there are no reused or new containers to release. try to preempt running containers // this assert will be a no-op in production but can help identify // invalid assumptions during testing assert delayedContainerManager.delayedContainers.isEmpty(); - - if ((numHeartbeats - heartbeatAtLastPreemption) <= numHeartbeatsBetweenPreemptions) { - return; - } - + Priority preemptedTaskPriority = null; int numEntriesAtPreemptedPriority = 0; for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) { @@ -1304,7 +1303,12 @@ public class YarnTaskSchedulerService extends TaskScheduler numPendingRequestsToService = Math.min(newNumPendingRequestsToService, numPendingRequestsToService); if (numPendingRequestsToService < 1) { - return; + return true; + } + // wait for enough heartbeats since this request became active for preemption + if ((numHeartbeats - heartbeatAtLastPreemption) < numHeartbeatsBetweenPreemptions) { + // stop incrementing lastpreemption heartbeat count + return false; } LOG.info("Trying to service " + numPendingRequestsToService + " out of total " + numHighestPriRequests + " pending requests at pri: " @@ -1331,7 +1335,6 @@ public class YarnTaskSchedulerService extends TaskScheduler // upcall outside locks if (preemptedContainers != null) { - heartbeatAtLastPreemption = numHeartbeats; for(int i=0; i<numPendingRequestsToService; ++i) { ContainerId cId = preemptedContainers[i]; if (cId != null) { @@ -1340,6 +1343,7 @@ public class YarnTaskSchedulerService extends TaskScheduler } } } + return true; } private boolean fitsIn(Resource toFit, Resource resource) { http://git-wip-us.apache.org/repos/asf/tez/blob/a06cd76d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java index 5efea48..1fc5092 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java @@ -1508,6 +1508,7 @@ public class TestTaskScheduler { Configuration conf = new Configuration(); conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false); + conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS, 3); TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, false, null, null, new PreemptionMatcher(), conf); @@ -1692,7 +1693,10 @@ public class TestTaskScheduler { scheduler.getProgress(); drainableAppCallback.drain(); verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any()); + // no need for task preemption until now - so they should match + Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption); + // add a pending request that cannot be allocated until resources free up Object mockTask3WaitCookie = new Object(); scheduler.allocateTask(mockTask3Wait, taskAsk, null, null, pri6, obj3, mockTask3WaitCookie); @@ -1711,6 +1715,7 @@ public class TestTaskScheduler { containers.clear(); containers.add(mockContainer4); + // new lower pri container added that wont be matched and eventually preempted // Fudge new container being present in delayed allocation list due to race HeldContainer heldContainer = new HeldContainer(mockContainer4, -1, -1, null, containerSignatureMatcher); @@ -1719,6 +1724,9 @@ public class TestTaskScheduler { scheduler.getProgress(); drainableAppCallback.drain(); verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any()); + // no need for task preemption until now - so they should match + Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption); + heldContainer.incrementAssignmentAttempts(); // no preemption - container assignment attempts < 3 scheduler.getProgress(); @@ -1740,12 +1748,18 @@ public class TestTaskScheduler { // remove fudging. scheduler.delayedContainerManager.delayedContainers.clear(); - + + // no need for task preemption until now - so they should match + Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption); + scheduler.allocateTask(mockTask3Retry, taskAsk, null, null, pri5, obj3, null); // no preemption - higher pri. exact match scheduler.getProgress(); + // no need for task preemption until now - so they should match drainableAppCallback.drain(); + // no need for task preemption until now - so they should match + Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption); verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any()); for (int i=0; i<11; ++i) { @@ -1758,16 +1772,22 @@ public class TestTaskScheduler { // this is also a higher priority container than the pending task priority but was running a // lower priority task. Task priority is relevant for preemption and not container priority as // containers can run tasks of different priorities - scheduler.getProgress(); + scheduler.getProgress(); // first heartbeat + Assert.assertTrue(scheduler.numHeartbeats > scheduler.heartbeatAtLastPreemption); + drainableAppCallback.drain(); + scheduler.getProgress(); // second heartbeat + drainableAppCallback.drain(); + verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any()); + scheduler.getProgress(); // third heartbeat drainableAppCallback.drain(); verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)any()); verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId3B); - // next 3 heartbeats do nothing, waiting for the RM to act on the last released resources - scheduler.getProgress(); - scheduler.getProgress(); - scheduler.getProgress(); + Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption); + // there are pending preemptions. + scheduler.getProgress(); // first heartbeat + scheduler.getProgress(); // second heartbeat verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId) any()); - scheduler.getProgress(); + scheduler.getProgress(); // third heartbeat drainableAppCallback.drain(); // Next oldest mockTaskPri3KillA gets preempted to clear 10% of outstanding running preemptable tasks verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId)any()); http://git-wip-us.apache.org/repos/asf/tez/blob/a06cd76d/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java index ca9250d..7ea5a39 100644 --- a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java @@ -168,7 +168,7 @@ public class TestAnalyzer { remoteStagingDir.toString()); tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false); - tezSession = TezClient.create("TestFaultTolerance", tezConf, true); + tezSession = TezClient.create("TestAnalyzer", tezConf, true); tezSession.start(); } @@ -821,4 +821,4 @@ public class TestAnalyzer { return Collections.singletonList(check); } -} \ No newline at end of file +}
