Repository: tez Updated Branches: refs/heads/master 9bb01e4f8 -> 663ead2dc
TEZ-2687. ATS History shutdown happens before the min-held containers are released (zjffdu) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/663ead2d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/663ead2d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/663ead2d Branch: refs/heads/master Commit: 663ead2dc5f3778bbdeb0f479811e2d89f506afc Parents: 9bb01e4 Author: Jeff Zhang <[email protected]> Authored: Fri Aug 21 13:33:03 2015 +0800 Committer: Jeff Zhang <[email protected]> Committed: Fri Aug 21 13:33:03 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 4 + .../org/apache/tez/dag/app/DAGAppMaster.java | 6 + .../dag/app/rm/LocalTaskSchedulerService.java | 5 + .../dag/app/rm/TaskSchedulerEventHandler.java | 6 +- .../tez/dag/app/rm/TaskSchedulerService.java | 2 + .../dag/app/rm/YarnTaskSchedulerService.java | 53 ++++- .../tez/dag/app/rm/TestTaskScheduler.java | 226 ++++++++++++++++++- 7 files changed, 292 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a662484..8fe9627 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES TEZ-2468. Change the minimum Java version to Java 7. ALL CHANGES: + TEZ-2687. ATS History shutdown happens before the min-held containers are released TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters TEZ-2730. tez-api missing dependency on org.codehaus.jettison for json. TEZ-2719. Consider reducing logs in unordered fetcher with shared-fetch option @@ -74,6 +75,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2687. ATS History shutdown happens before the min-held containers are released TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters TEZ-2540. Create both tez-dist minimal and minimal.tar.gz formats as part of build TEZ-2630. TezChild receives IP address instead of FQDN. @@ -296,6 +298,7 @@ Release 0.6.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2687. ATS History shutdown happens before the min-held containers are released TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters TEZ-2630. TezChild receives IP address instead of FQDN. @@ -504,6 +507,7 @@ INCOMPATIBLE CHANGES TEZ-2552. CRC errors can cause job to run for very long time in large jobs. ALL CHANGES: + TEZ-2687. ATS History shutdown happens before the min-held containers are released TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters TEZ-2630. TezChild receives IP address instead of FQDN. TEZ-2635. Limit number of attempts being downloaded in unordered fetch. http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index b91c3d1..401bfbc 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -1857,6 +1857,10 @@ public class DAGAppMaster extends AbstractService { } + private void initiateStop() { + taskSchedulerEventHandler.initiateStop(); + } + @Override public void serviceStop() throws Exception { if (isSession) { @@ -1866,6 +1870,8 @@ public class DAGAppMaster extends AbstractService { if (this.dagSubmissionTimer != null) { this.dagSubmissionTimer.cancel(); } + // release all the held containers before stop services TEZ-2687 + initiateStop(); stopServices(); // Given pre-emption, we should delete tez scratch dir only if unregister is http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java index 51d8b9d..2c7c10c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java @@ -194,6 +194,11 @@ public class LocalTaskSchedulerService extends TaskSchedulerService { return true; } + @Override + public void initiateStop() { + + } + static class LocalContainerFactory { final AppContext appContext; AtomicInteger nextId; http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index a3cd284..4428665 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -394,7 +394,11 @@ public class TaskSchedulerEventHandler extends AbstractService protected void notifyForTest() { } - + + public void initiateStop() { + taskScheduler.initiateStop(); + } + @Override public void serviceStop() { synchronized(this) { http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java index 48d5455..7b729e7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java @@ -70,6 +70,8 @@ public abstract class TaskSchedulerService extends AbstractService{ public abstract boolean hasUnregistered(); + public abstract void initiateStop(); + public interface TaskSchedulerAppCallback { public class AppFinalStatus { public final FinalApplicationStatus exitStatus; http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index 19902b3..73fcb3d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -19,6 +19,7 @@ package org.apache.tez.dag.app.rm; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -39,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.math3.random.RandomDataGenerator; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; @@ -133,7 +135,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService final AppContext appContext; private AtomicBoolean hasUnregistered = new AtomicBoolean(false); - AtomicBoolean isStopped = new AtomicBoolean(false); + AtomicBoolean isStopStarted = new AtomicBoolean(false); private ContainerAssigner NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner(); private ContainerAssigner RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner(); @@ -393,7 +395,6 @@ public class YarnTaskSchedulerService extends TaskSchedulerService // Wait for contianers to be released. delayedContainerManager.join(2000l); synchronized (this) { - isStopped.set(true); if (shouldUnregister.get()) { AppFinalStatus status = appClientDelegate.getFinalAppStatus(); LOG.info("Unregistering application from RM" @@ -426,7 +427,10 @@ public class YarnTaskSchedulerService extends TaskSchedulerService // AMRMClientAsync interface methods @Override public void onContainersCompleted(List<ContainerStatus> statuses) { - if (isStopped.get()) { + if (isStopStarted.get()) { + for (ContainerStatus status : statuses) { + LOG.info("Container " + status.getContainerId() + " is completed"); + } return; } Map<Object, ContainerStatus> appContainerStatus = @@ -483,7 +487,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService @Override public void onContainersAllocated(List<Container> containers) { - if (isStopped.get()) { + if (isStopStarted.get()) { + for (Container container : containers) { + LOG.info("Release container:" + container.getId() + ", because it is shutting down."); + releaseContainer(container.getId()); + } return; } Map<CookieContainerRequest, Container> assignedContainers; @@ -857,7 +865,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService @Override public void onShutdownRequest() { - if (isStopped.get()) { + if (isStopStarted.get()) { return; } // upcall to app must be outside locks @@ -866,7 +874,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService @Override public void onNodesUpdated(List<NodeReport> updatedNodes) { - if (isStopped.get()) { + if (isStopStarted.get()) { return; } // ignore bad nodes for now @@ -876,7 +884,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService @Override public float getProgress() { - if (isStopped.get()) { + if (isStopStarted.get()) { return 1; } @@ -898,7 +906,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService @Override public void onError(Throwable t) { - if (isStopped.get()) { + if (isStopStarted.get()) { + LOG.error("Got TaskSchedulerError, " + ExceptionUtils.getStackTrace(t)); return; } appClientDelegate.onError(t); @@ -1064,6 +1073,34 @@ public class YarnTaskSchedulerService extends TaskSchedulerService return null; } + @Override + public synchronized void initiateStop() { + LOG.info("Initiate stop to YarnTaskScheduler"); + // release held containers + LOG.info("Release held containers"); + isStopStarted.set(true); + // Create a new list for containerIds to iterate, otherwise it would cause ConcurrentModificationException + // because method releaseContainer will change heldContainers. + List<ContainerId> heldContainerIds = new ArrayList<ContainerId>(heldContainers.size()); + for (ContainerId containerId : heldContainers.keySet()) { + heldContainerIds.add(containerId); + } + for (ContainerId containerId : heldContainerIds) { + releaseContainer(containerId); + } + + // remove taskRequest from AMRMClient to avoid allocating new containers in the next heartbeat + LOG.info("Remove all the taskRequests"); + // Create a new list for tasks to avoid ConcurrentModificationException + List<Object> tasks = new ArrayList<Object>(taskRequests.size()); + for (Object task : taskRequests.keySet()) { + tasks.add(task); + } + for (Object task : tasks) { + removeTaskRequest(task); + } + } + boolean canFit(Resource arg0, Resource arg1) { int mem0 = arg0.getMemory(); int mem1 = arg1.getMemory(); http://git-wip-us.apache.org/repos/asf/tez/blob/663ead2d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java index dabae67..7e2c674 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java @@ -503,7 +503,231 @@ public class TestTaskScheduler { verify(mockRMClient).stop(); scheduler.close(); } - + + @SuppressWarnings({ "unchecked" }) + @Test(timeout=10000) + public void testTaskSchedulerInitiateStop() throws Exception { + RackResolver.init(new YarnConfiguration()); + TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); + AppContext mockAppContext = mock(AppContext.class); + when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING); + + TezAMRMClientAsync<CookieContainerRequest> mockRMClient = + mock(TezAMRMClientAsync.class); + + String appHost = "host"; + int appPort = 0; + String appUrl = "url"; + TaskSchedulerWithDrainableAppCallback scheduler = + new TaskSchedulerWithDrainableAppCallback( + mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort, + appUrl, mockRMClient, mockAppContext); + final TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler + .getDrainableAppCallback(); + + Configuration conf = new Configuration(); + conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); + // keep containers held for 10 seconds + conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 10000); + conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 10000); + scheduler.init(conf); + drainableAppCallback.drain(); + + RegisterApplicationMasterResponse mockRegResponse = + mock(RegisterApplicationMasterResponse.class); + Resource mockMaxResource = mock(Resource.class); + Map<ApplicationAccessType, String> mockAcls = mock(Map.class); + when(mockRegResponse.getMaximumResourceCapability()). + thenReturn(mockMaxResource); + when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls); + when(mockRMClient. + registerApplicationMaster(anyString(), anyInt(), anyString())). + thenReturn(mockRegResponse); + Resource mockClusterResource = mock(Resource.class); + when(mockRMClient.getAvailableResources()). + thenReturn(mockClusterResource); + + scheduler.start(); + drainableAppCallback.drain(); + + Object mockTask1 = mock(Object.class); + when(mockTask1.toString()).thenReturn("task1"); + Object mockCookie1 = mock(Object.class); + Resource mockCapability = mock(Resource.class); + String[] hosts = {"host1", "host5"}; + String[] racks = {"/default-rack", "/default-rack"}; + final Priority mockPriority1 = Priority.newInstance(1); + final Priority mockPriority2 = Priority.newInstance(2); + final Priority mockPriority3 = Priority.newInstance(3); + final Priority mockPriority4 = Priority.newInstance(4); + final Priority mockPriority5 = Priority.newInstance(5); + Object mockTask2 = mock(Object.class); + when(mockTask2.toString()).thenReturn("task2"); + Object mockCookie2 = mock(Object.class); + Object mockTask3 = mock(Object.class); + when(mockTask3.toString()).thenReturn("task3"); + Object mockCookie3 = mock(Object.class); + ArgumentCaptor<CookieContainerRequest> requestCaptor = + ArgumentCaptor.forClass(CookieContainerRequest.class); + + scheduler.allocateTask(mockTask1, mockCapability, hosts, + racks, mockPriority1, null, mockCookie1); + drainableAppCallback.drain(); + verify(mockRMClient, times(1)). + addContainerRequest(requestCaptor.capture()); + CookieContainerRequest request1 = requestCaptor.getValue(); + scheduler.allocateTask(mockTask2, mockCapability, hosts, + racks, mockPriority2, null, mockCookie2); + drainableAppCallback.drain(); + verify(mockRMClient, times(2)). + addContainerRequest(requestCaptor.capture()); + CookieContainerRequest request2 = requestCaptor.getValue(); + scheduler.allocateTask(mockTask3, mockCapability, hosts, + racks, mockPriority3, null, mockCookie3); + drainableAppCallback.drain(); + verify(mockRMClient, times(3)). + addContainerRequest(requestCaptor.capture()); + CookieContainerRequest request3 = requestCaptor.getValue(); + + List<Container> containers = new ArrayList<Container>(); + // sending lower priority container first to make sure its not matched + Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS); + when(mockContainer1.getNodeId().getHost()).thenReturn("host1"); + when(mockContainer1.getPriority()).thenReturn(mockPriority1); + when(mockContainer1.toString()).thenReturn("container1"); + ContainerId mockCId1 = mock(ContainerId.class); + when(mockContainer1.getId()).thenReturn(mockCId1); + when(mockCId1.toString()).thenReturn("container1"); + containers.add(mockContainer1); + Container mockContainer2 = mock(Container.class, RETURNS_DEEP_STUBS); + when(mockContainer2.getNodeId().getHost()).thenReturn("host2"); + when(mockContainer2.getPriority()).thenReturn(mockPriority2); + when(mockContainer2.toString()).thenReturn("container2"); + ContainerId mockCId2 = mock(ContainerId.class); + when(mockContainer2.getId()).thenReturn(mockCId2); + when(mockCId2.toString()).thenReturn("container2"); + containers.add(mockContainer2); + + ArrayList<CookieContainerRequest> hostContainers = + new ArrayList<CookieContainerRequest>(); + hostContainers.add(request1); + ArrayList<CookieContainerRequest> rackContainers = + new ArrayList<CookieContainerRequest>(); + rackContainers.add(request2); + ArrayList<CookieContainerRequest> anyContainers = + new ArrayList<CookieContainerRequest>(); + anyContainers.add(request3); + + final List<ArrayList<CookieContainerRequest>> hostList = + new LinkedList<ArrayList<CookieContainerRequest>>(); + hostList.add(hostContainers); + final List<ArrayList<CookieContainerRequest>> rackList = + new LinkedList<ArrayList<CookieContainerRequest>>(); + rackList.add(rackContainers); + final List<ArrayList<CookieContainerRequest>> anyList = + new LinkedList<ArrayList<CookieContainerRequest>>(); + anyList.add(anyContainers); + final List<ArrayList<CookieContainerRequest>> emptyList = + new LinkedList<ArrayList<CookieContainerRequest>>(); + // return pri1 requests for host1 + when( + mockRMClient.getMatchingRequestsForTopPriority(eq("host1"), + (Resource) any())).thenAnswer( + new Answer<List<? extends Collection<CookieContainerRequest>>>() { + @Override + public List<? extends Collection<CookieContainerRequest>> answer( + InvocationOnMock invocation) throws Throwable { + return hostList; + } + + }); + // second request matched to rack. RackResolver by default puts hosts in + // /default-rack. We need to workaround by returning rack matches only once + when( + mockRMClient.getMatchingRequestsForTopPriority(eq("/default-rack"), + (Resource) any())).thenAnswer( + new Answer<List<? extends Collection<CookieContainerRequest>>>() { + @Override + public List<? extends Collection<CookieContainerRequest>> answer( + InvocationOnMock invocation) throws Throwable { + return rackList; + } + + }).thenAnswer( + new Answer<List<? extends Collection<CookieContainerRequest>>>() { + @Override + public List<? extends Collection<CookieContainerRequest>> answer( + InvocationOnMock invocation) throws Throwable { + return emptyList; + } + + }); + // third request matched to ANY + when( + mockRMClient.getMatchingRequestsForTopPriority( + eq(ResourceRequest.ANY), (Resource) any())).thenAnswer( + new Answer<List<? extends Collection<CookieContainerRequest>>>() { + @Override + public List<? extends Collection<CookieContainerRequest>> answer( + InvocationOnMock invocation) throws Throwable { + return anyList; + } + + }).thenAnswer( + new Answer<List<? extends Collection<CookieContainerRequest>>>() { + @Override + public List<? extends Collection<CookieContainerRequest>> answer( + InvocationOnMock invocation) throws Throwable { + return emptyList; + } + + }); + + when(mockRMClient.getTopPriority()).then( + new Answer<Priority>() { + @Override + public Priority answer( + InvocationOnMock invocation) throws Throwable { + int allocations = drainableAppCallback.count.get(); + if (allocations == 0) { + return mockPriority1; + } + if (allocations == 1) { + return mockPriority2; + } + if (allocations == 2) { + return mockPriority3; + } + if (allocations == 3) { + return mockPriority4; + } + return null; + } + }); + + AtomicBoolean drainNotifier = new AtomicBoolean(false); + scheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; + + scheduler.onContainersAllocated(containers); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + drainableAppCallback.drain(); + + Assert.assertEquals(2, scheduler.heldContainers.size()); + Assert.assertEquals(1, scheduler.taskRequests.size()); + // 2 containers are allocated and their corresponding taskRequests are removed. + verify(mockRMClient).removeContainerRequest(request1); + verify(mockRMClient).removeContainerRequest(request2); + + scheduler.initiateStop(); + // verify all the containers are released + Assert.assertEquals(0, scheduler.heldContainers.size()); + verify(mockRMClient).releaseAssignedContainer(mockCId1); + verify(mockRMClient).releaseAssignedContainer(mockCId2); + // verify taskRequests are removed + Assert.assertEquals(0, scheduler.taskRequests.size()); + verify(mockRMClient).removeContainerRequest(request3); + } + @SuppressWarnings({ "unchecked" }) @Test(timeout=10000) public void testTaskSchedulerWithReuse() throws Exception {
