Repository: tez Updated Branches: refs/heads/master 4b62875c9 -> a93dbf0b2
TEZ-3491. Tez job can hang due to container priority inversion (jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a93dbf0b Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a93dbf0b Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a93dbf0b Branch: refs/heads/master Commit: a93dbf0b2f143930ac8c5d51e48927989c0781a0 Parents: 4b62875 Author: Jason Lowe <[email protected]> Authored: Thu Nov 10 17:28:03 2016 +0000 Committer: Jason Lowe <[email protected]> Committed: Thu Nov 10 17:28:03 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../dag/app/rm/YarnTaskSchedulerService.java | 46 ++++--- .../tez/dag/app/rm/TestTaskScheduler.java | 128 +++++++++++++++++++ 3 files changed, 158 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a93dbf0b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0187f00..af83c73 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3491. Tez job can hang due to container priority inversion. TEZ-3533. ShuffleScheduler should shutdown threadpool on exit. TEZ-3477. MRInputHelpers generateInputSplitsToMem public API modified TEZ-3465. Support broadcast edge into cartesian product vertex and forbid other edges. @@ -145,6 +146,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3491. Tez job can hang due to container priority inversion. TEZ-3533. ShuffleScheduler should shutdown threadpool on exit. TEZ-3493. DAG submit timeout cannot be set to a month TEZ-3505. Move license to the file header for TezBytesWritableSerialization http://git-wip-us.apache.org/repos/asf/tez/blob/a93dbf0b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index 5087d0d..41d380a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -474,7 +474,12 @@ public class YarnTaskSchedulerService extends TaskScheduler } // container neither allocated nor released - LOG.info("Ignoring unknown container: " + containerStatus.getContainerId()); + if (delayedContainer != null) { + LOG.info("Delayed container {} completed", containerStatus.getContainerId()); + maybeRescheduleContainerAtPriority(delayedContainer.getContainer().getPriority()); + } else { + LOG.info("Ignoring unknown container: " + containerStatus.getContainerId()); + } } } @@ -1314,23 +1319,8 @@ public class YarnTaskSchedulerService extends TaskScheduler // to us anymore. So we need to ask for this again. If there is no // outstanding request at that priority then its fine to not ask again. // See TEZ-915 for more details - for (Map.Entry<Object, CookieContainerRequest> entry : taskRequests.entrySet()) { - Object task = entry.getKey(); - CookieContainerRequest request = entry.getValue(); - if (request.getPriority().equals(lowestPriNewContainer.getPriority())) { - LOG.info("Resending request for task again: " + task); - deallocateTask(task, true, null, null); - allocateTask(task, request.getCapability(), - (request.getNodes() == null ? null : - request.getNodes().toArray(new String[request.getNodes().size()])), - (request.getRacks() == null ? null : - request.getRacks().toArray(new String[request.getRacks().size()])), - request.getPriority(), - request.getCookie().getContainerSignature(), - request.getCookie().getAppCookie()); - break; - } - } + maybeRescheduleContainerAtPriority(lowestPriNewContainer.getPriority()); + // come back and free more new containers if needed continue; } @@ -1427,6 +1417,26 @@ public class YarnTaskSchedulerService extends TaskScheduler return true; } + private void maybeRescheduleContainerAtPriority(Priority priority) { + for (Map.Entry<Object, CookieContainerRequest> entry : taskRequests.entrySet()) { + Object task = entry.getKey(); + CookieContainerRequest request = entry.getValue(); + if (request.getPriority().equals(priority)) { + LOG.info("Resending request for task again: " + task); + deallocateTask(task, true, null, null); + allocateTask(task, request.getCapability(), + (request.getNodes() == null ? null : + request.getNodes().toArray(new String[request.getNodes().size()])), + (request.getRacks() == null ? null : + request.getRacks().toArray(new String[request.getRacks().size()])), + request.getPriority(), + request.getCookie().getContainerSignature(), + request.getCookie().getAppCookie()); + break; + } + } + } + private boolean fitsIn(Resource toFit, Resource resource) { // YARN-893 prevents using correct library code //return Resources.fitsIn(toFit, resource); http://git-wip-us.apache.org/repos/asf/tez/blob/a93dbf0b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java index a3e5ff5..5c8daeb 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java @@ -50,6 +50,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.commons.io.IOExceptionWithCause; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; @@ -2192,6 +2193,133 @@ public class TestTaskScheduler { Assert.assertEquals(1, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(1, 1)); } + @SuppressWarnings("unchecked") + @Test + public void testContainerExpired() throws Exception { + RackResolver.init(new YarnConfiguration()); + + TezAMRMClientAsync<CookieContainerRequest> mockRMClient = + mock(TezAMRMClientAsync.class); + + String appHost = "host"; + int appPort = 0; + String appUrl = "url"; + + Configuration conf = new Configuration(); + // to match all in the same pass + conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); + // to release immediately after deallocate + conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); + conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); + + TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf); + final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp); + TaskSchedulerWithDrainableContext scheduler = + new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient); + + scheduler.initialize(); + drainableAppCallback.drain(); + + RegisterApplicationMasterResponse mockRegResponse = + mock(RegisterApplicationMasterResponse.class); + Resource mockMaxResource = mock(Resource.class); + Map<ApplicationAccessType, String> mockAcls = mock(Map.class); + when(mockRegResponse.getMaximumResourceCapability()). + thenReturn(mockMaxResource); + when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls); + when(mockRMClient. + registerApplicationMaster(anyString(), anyInt(), anyString())). + thenReturn(mockRegResponse); + Resource mockClusterResource = mock(Resource.class); + when(mockRMClient.getAvailableResources()). + thenReturn(mockClusterResource); + + scheduler.start(); + drainableAppCallback.drain(); + + Object mockTask1 = mock(Object.class); + when(mockTask1.toString()).thenReturn("task1"); + Object mockCookie1 = mock(Object.class); + Resource mockCapability = mock(Resource.class); + String[] hosts = {"host1", "host5"}; + String[] racks = {"/default-rack", "/default-rack"}; + final Priority mockPriority1 = Priority.newInstance(1); + final Priority mockPriority2 = Priority.newInstance(2); + Object mockTask2 = mock(Object.class); + when(mockTask2.toString()).thenReturn("task2"); + Object mockCookie2 = mock(Object.class); + ArgumentCaptor<CookieContainerRequest> requestCaptor = + ArgumentCaptor.forClass(CookieContainerRequest.class); + + scheduler.allocateTask(mockTask2, mockCapability, hosts, + racks, mockPriority2, null, mockCookie2); + drainableAppCallback.drain(); + verify(mockRMClient, times(1)). + addContainerRequest(requestCaptor.capture()); + CookieContainerRequest request2 = requestCaptor.getValue(); + + scheduler.allocateTask(mockTask1, mockCapability, hosts, + racks, mockPriority1, null, mockCookie1); + drainableAppCallback.drain(); + verify(mockRMClient, times(2)). + addContainerRequest(requestCaptor.capture()); + CookieContainerRequest request1 = requestCaptor.getValue(); + + List<Container> containers = new ArrayList<Container>(); + // sending only lower priority container to make sure its not matched + Container mockContainer2 = mock(Container.class, RETURNS_DEEP_STUBS); + when(mockContainer2.getNodeId().getHost()).thenReturn("host2"); + when(mockContainer2.getPriority()).thenReturn(mockPriority2); + when(mockContainer2.toString()).thenReturn("container2"); + ContainerId mockCId2 = mock(ContainerId.class); + when(mockContainer2.getId()).thenReturn(mockCId2); + when(mockCId2.toString()).thenReturn("container2"); + containers.add(mockContainer2); + ArrayList<CookieContainerRequest> hostContainers = + new ArrayList<CookieContainerRequest>(); + hostContainers.add(request1); + final List<ArrayList<CookieContainerRequest>> hostList = + new LinkedList<ArrayList<CookieContainerRequest>>(); + hostList.add(hostContainers); + + when( + mockRMClient.getMatchingRequestsForTopPriority(eq("host1"), + (Resource) any())).thenAnswer( + new Answer<List<? extends Collection<CookieContainerRequest>>>() { + @Override + public List<? extends Collection<CookieContainerRequest>> answer( + InvocationOnMock invocation) throws Throwable { + return hostList; + } + + }); + when(mockRMClient.getTopPriority()).thenReturn(mockPriority1); + + scheduler.onContainersAllocated(containers); + + List<ContainerStatus> statuses = new ArrayList<ContainerStatus>(); + ContainerStatus mockStatus2 = mock(ContainerStatus.class); + when(mockStatus2.getContainerId()).thenReturn(mockCId2); + statuses.add(mockStatus2); + scheduler.onContainersCompleted(statuses); + + verify(mockApp, times(0)).taskAllocated(any(), any(), any(Container.class)); + verify(mockRMClient, times(3)).addContainerRequest(requestCaptor.capture()); + CookieContainerRequest resubmitRequest = requestCaptor.getValue(); + assertEquals(request2.getCookie().getTask(), resubmitRequest.getCookie().getTask()); + assertEquals(request2.getCookie().getAppCookie(), resubmitRequest.getCookie().getAppCookie()); + assertEquals(request2.getCookie().getContainerSignature(), resubmitRequest.getCookie().getContainerSignature()); + assertEquals(request2.getCapability(), resubmitRequest.getCapability()); + assertEquals(request2.getPriority(), resubmitRequest.getPriority()); + + // verify container is not re-requested when nothing at that priority + assertFalse(scheduler.deallocateTask(mockTask2, true, null, null)); + scheduler.onContainersAllocated(containers); + scheduler.onContainersCompleted(statuses); + verify(mockApp, times(0)).taskAllocated(any(), any(), any(Container.class)); + verify(mockRMClient, times(3)).addContainerRequest(requestCaptor.capture()); + } + private Container createContainer(int id, String host, Resource resource, Priority priority) { ContainerId containerID = ContainerId.newInstance(
