TEZ-2217. The min-held-containers being released prematurely (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d42a3c7a Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d42a3c7a Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d42a3c7a Branch: refs/heads/TEZ-2003 Commit: d42a3c7a78b14ba496721e5db4a63229c3cf011c Parents: 1ba1f92 Author: Bikas Saha <[email protected]> Authored: Thu Mar 26 10:05:29 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Thu Mar 26 10:05:29 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../dag/app/rm/YarnTaskSchedulerService.java | 53 +++++++++----- .../tez/dag/app/rm/TestTaskScheduler.java | 77 ++++++++++++++++---- 3 files changed, 96 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d42a3c7a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9db71a8..990cd28 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -251,6 +251,7 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-2217. The min-held-containers being released prematurely TEZ-2214. FetcherOrderedGrouped can get stuck indefinitely when MergeManager misses memToDiskMerging TEZ-1923. FetcherOrderedGrouped gets into infinite loop due to memory pressure TEZ-2219. Should verify the input_name/output_name to be unique per vertex http://git-wip-us.apache.org/repos/asf/tez/blob/d42a3c7a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index 4818b92..6c31349 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -353,10 +353,10 @@ public class YarnTaskSchedulerService extends TaskSchedulerService ", reuseNonLocal: " + reuseNonLocal + ", localitySchedulingDelay: " + localitySchedulingDelay + ", preemptionPercentage: " + preemptionPercentage + - ", numHeartbeatsBetweenPreemptions" + numHeartbeatsBetweenPreemptions + - ", idleContainerMinTimeout=" + idleContainerTimeoutMin + - ", idleContainerMaxTimeout=" + idleContainerTimeoutMax + - ", sessionMinHeldContainers=" + sessionNumMinHeldContainers); + ", numHeartbeatsBetweenPreemptions: " + numHeartbeatsBetweenPreemptions + + ", idleContainerMinTimeout: " + idleContainerTimeoutMin + + ", idleContainerMaxTimeout: " + idleContainerTimeoutMax + + ", sessionMinHeldContainers: " + sessionNumMinHeldContainers); } @Override @@ -561,6 +561,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService @VisibleForTesting long getHeldContainerExpireTime(long startTime) { + // expire time is at least extended by min time. + // corner case when min time = -1 but then it does not matter because + // expire time is irrelevant at that point. long expireTime = (startTime + idleContainerTimeoutMin); if (idleContainerTimeoutMin != -1 && idleContainerTimeoutMin < idleContainerTimeoutMax) { long expireTimeMax = startTime + idleContainerTimeoutMax; @@ -614,22 +617,23 @@ public class YarnTaskSchedulerService extends TaskSchedulerService && idleContainerTimeoutMin != -1)) { // container idle timeout has expired or is a new unused container. // new container is possibly a spurious race condition allocation. - if (!isNew && appContext.isSession() && - sessionMinHeldContainers.contains(heldContainer.getContainer().getId())) { - // Not a potentially spurious new container. + if (appContext.isSession() + && sessionMinHeldContainers.contains(heldContainer.getContainer().getId())) { + // There are no outstanding requests. So its safe to hold new containers. + // We may have received more containers than necessary and some are unused // In session mode and container in set of chosen min held containers // increase the idle container expire time to maintain sanity with - // the rest of the code + // the rest of the code. heldContainer.setContainerExpiryTime(getHeldContainerExpireTime(currentTime)); } else { - releaseContainer = true; + releaseContainer = true; } } if (releaseContainer) { LOG.info("No taskRequests. Container's idle timeout delay expired or is new. " + "Releasing container" - + ", containerId=" + heldContainer.container.getId() + + ", containerId=" + heldContainer.getContainer().getId() + ", containerExpiryTime=" + heldContainer.getContainerExpiryTime() + ", idleTimeout=" + idleContainerTimeoutMin @@ -638,7 +642,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService + ", delayedContainers=" + delayedContainerManager.delayedContainers.size() + ", isNew=" + isNew); releaseUnassignedContainers( - Lists.newArrayList(heldContainer.container)); + Lists.newArrayList(heldContainer.getContainer())); } else { // no outstanding work and container idle timeout not expired if (LOG.isDebugEnabled()) { @@ -656,7 +660,20 @@ public class YarnTaskSchedulerService extends TaskSchedulerService } } else if (state.equals(DAGAppMasterState.RUNNING)) { // clear min held containers since we need to allocate to tasks - sessionMinHeldContainers.clear(); + if (!sessionMinHeldContainers.isEmpty()) { + // update the expire time of min held containers so that they are + // not released immediately, when new requests come in, if they come in + // just before these containers are about to expire (race condition) + long currentTime = System.currentTimeMillis(); + for (ContainerId minHeldCId : sessionMinHeldContainers) { + HeldContainer minHeldContainer = heldContainers.get(minHeldCId); + if (minHeldContainer != null) { + // check in case it got removed because of external reasons + minHeldContainer.setContainerExpiryTime(getHeldContainerExpireTime(currentTime)); + } + } + sessionMinHeldContainers.clear(); + } HeldContainer.LocalityMatchLevel localityMatchLevel = heldContainer.getLocalityMatchLevel(); Map<CookieContainerRequest, Container> assignedContainers = @@ -1441,11 +1458,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService } private void pushNewContainerToDelayed(List<Container> containers){ - long expireTime = -1; - if (idleContainerTimeoutMin > 0) { - long currentTime = System.currentTimeMillis(); - expireTime = currentTime + idleContainerTimeoutMin; - } + long expireTime = getHeldContainerExpireTime(System.currentTimeMillis()); synchronized (delayedContainerManager) { for (Container container : containers) { @@ -1953,7 +1966,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService releaseUnassignedContainers(new ContainerIterable(pendingContainers)); } - private void addDelayedContainer(Container container, + @VisibleForTesting + void addDelayedContainer(Container container, long nextScheduleTime) { HeldContainer delayedContainer = heldContainers.get(container.getId()); if (delayedContainer == null) { @@ -2055,7 +2069,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService } } - LOG.info("Holding on to " + sessionMinHeldContainers.size() + " containers"); + LOG.info("Holding on to " + sessionMinHeldContainers.size() + " containers" + + " out of total held containers: " + heldContainers.size()); } private static class ContainerIterable implements Iterable<Container> { http://git-wip-us.apache.org/repos/asf/tez/blob/d42a3c7a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java index aaa8a5f..21bce6d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java @@ -998,12 +998,13 @@ public class TestTaskScheduler { } @SuppressWarnings("unchecked") - @Test(timeout=5000) + @Test (timeout=5000) public void testTaskSchedulerDetermineMinHeldContainers() throws Exception { RackResolver.init(new YarnConfiguration()); TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); AppContext mockAppContext = mock(AppContext.class); when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING); + when(mockAppContext.isSession()).thenReturn(true); TezAMRMClientAsync<CookieContainerRequest> mockRMClient = mock(TezAMRMClientAsync.class); @@ -1015,6 +1016,8 @@ public class TestTaskScheduler { new TaskSchedulerWithDrainableAppCallback( mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort, appUrl, mockRMClient, mockAppContext); + TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler + .getDrainableAppCallback(); Configuration conf = new Configuration(); scheduler.init(conf); @@ -1042,42 +1045,71 @@ public class TestTaskScheduler { String node1Rack3 = "n1r3"; ApplicationAttemptId appId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0); + Resource r = Resource.newInstance(0, 0); ContainerId mockCId1 = ContainerId.newInstance(appId, 0); - HeldContainer hc1 = mock(HeldContainer.class, RETURNS_DEEP_STUBS); + Container c1 = mock(Container.class, RETURNS_DEEP_STUBS); + when(c1.getNodeId().getHost()).thenReturn(""); // we are mocking directly + HeldContainer hc1 = Mockito.spy(new HeldContainer(c1, 0, 0, null)); when(hc1.getNode()).thenReturn(node1Rack1); when(hc1.getRack()).thenReturn(rack1); - when(hc1.getContainer().getId()).thenReturn(mockCId1); + when(c1.getId()).thenReturn(mockCId1); + when(c1.getResource()).thenReturn(r); + when(hc1.getContainer()).thenReturn(c1); ContainerId mockCId2 = ContainerId.newInstance(appId, 1); - HeldContainer hc2 = mock(HeldContainer.class, RETURNS_DEEP_STUBS); + Container c2 = mock(Container.class, RETURNS_DEEP_STUBS); + when(c2.getNodeId().getHost()).thenReturn(""); // we are mocking directly + HeldContainer hc2 = Mockito.spy(new HeldContainer(c2, 0, 0, null)); when(hc2.getNode()).thenReturn(node2Rack1); when(hc2.getRack()).thenReturn(rack1); - when(hc2.getContainer().getId()).thenReturn(mockCId2); + when(c2.getId()).thenReturn(mockCId2); + when(c2.getResource()).thenReturn(r); + when(hc2.getContainer()).thenReturn(c2); ContainerId mockCId3 = ContainerId.newInstance(appId, 2); - HeldContainer hc3 = mock(HeldContainer.class, RETURNS_DEEP_STUBS); + Container c3 = mock(Container.class, RETURNS_DEEP_STUBS); + when(c3.getNodeId().getHost()).thenReturn(""); // we are mocking directly + HeldContainer hc3 = Mockito.spy(new HeldContainer(c3, 0, 0, null)); when(hc3.getNode()).thenReturn(node1Rack1); when(hc3.getRack()).thenReturn(rack1); - when(hc3.getContainer().getId()).thenReturn(mockCId3); + when(c3.getId()).thenReturn(mockCId3); + when(c3.getResource()).thenReturn(r); + when(hc3.getContainer()).thenReturn(c3); ContainerId mockCId4 = ContainerId.newInstance(appId, 3); - HeldContainer hc4 = mock(HeldContainer.class, RETURNS_DEEP_STUBS); + Container c4 = mock(Container.class, RETURNS_DEEP_STUBS); + when(c4.getNodeId().getHost()).thenReturn(""); // we are mocking directly + HeldContainer hc4 = Mockito.spy(new HeldContainer(c4, 0, 0, null)); when(hc4.getNode()).thenReturn(node2Rack1); when(hc4.getRack()).thenReturn(rack1); - when(hc4.getContainer().getId()).thenReturn(mockCId4); + when(c4.getId()).thenReturn(mockCId4); + when(c4.getResource()).thenReturn(r); + when(hc4.getContainer()).thenReturn(c4); ContainerId mockCId5 = ContainerId.newInstance(appId, 4); - HeldContainer hc5 = mock(HeldContainer.class, RETURNS_DEEP_STUBS); + Container c5 = mock(Container.class, RETURNS_DEEP_STUBS); + when(c5.getNodeId().getHost()).thenReturn(""); // we are mocking directly + HeldContainer hc5 = Mockito.spy(new HeldContainer(c5, 0, 0, null)); when(hc5.getNode()).thenReturn(node1Rack2); when(hc5.getRack()).thenReturn(rack2); - when(hc5.getContainer().getId()).thenReturn(mockCId5); + when(c5.getId()).thenReturn(mockCId5); + when(c5.getResource()).thenReturn(r); + when(hc5.getContainer()).thenReturn(c5); ContainerId mockCId6 = ContainerId.newInstance(appId, 5); - HeldContainer hc6 = mock(HeldContainer.class, RETURNS_DEEP_STUBS); + Container c6 = mock(Container.class, RETURNS_DEEP_STUBS); + when(c6.getNodeId().getHost()).thenReturn(""); // we are mocking directly + HeldContainer hc6 = Mockito.spy(new HeldContainer(c6, 0, 0, null)); when(hc6.getNode()).thenReturn(node2Rack2); when(hc6.getRack()).thenReturn(rack2); - when(hc6.getContainer().getId()).thenReturn(mockCId6); + when(c6.getId()).thenReturn(mockCId6); + when(c6.getResource()).thenReturn(r); + when(hc6.getContainer()).thenReturn(c6); ContainerId mockCId7 = ContainerId.newInstance(appId, 6); - HeldContainer hc7 = mock(HeldContainer.class, RETURNS_DEEP_STUBS); + Container c7 = mock(Container.class, RETURNS_DEEP_STUBS); + when(c7.getNodeId().getHost()).thenReturn(""); // we are mocking directly + HeldContainer hc7 = Mockito.spy(new HeldContainer(c7, 0, 0, null)); when(hc7.getNode()).thenReturn(node1Rack3); when(hc7.getRack()).thenReturn(rack3); - when(hc7.getContainer().getId()).thenReturn(mockCId7); - + when(c7.getId()).thenReturn(mockCId7); + when(c7.getResource()).thenReturn(r); + when(hc7.getContainer()).thenReturn(c7); + scheduler.heldContainers.put(mockCId1, hc1); scheduler.heldContainers.put(mockCId2, hc2); scheduler.heldContainers.put(mockCId3, hc3); @@ -1123,6 +1155,19 @@ public class TestTaskScheduler { Assert.assertTrue(racks.contains(rack1) && racks.contains(rack2) && racks.contains(rack3)); + long currTime = System.currentTimeMillis(); + heldContainers.clear(); + heldContainers.addAll(scheduler.heldContainers.values()); + for (HeldContainer hc : heldContainers) { + when(hc.isNew()).thenReturn(true); + scheduler.delayedContainerManager.addDelayedContainer(hc.getContainer(), currTime); + } + Thread.sleep(1000); + drainableAppCallback.drain(); + // only the 2 container not in min-held containers are released + verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)any()); + Assert.assertEquals(5, scheduler.heldContainers.size()); + String appMsg = "success"; AppFinalStatus finalStatus = new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
