TEZ-3508. TestTaskScheduler cleanup. (jlowe)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/501a351d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/501a351d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/501a351d Branch: refs/heads/TEZ-3334 Commit: 501a351d59d6bafbd3d4605785492d81a574574b Parents: b71ea4a Author: Jason Lowe <[email protected]> Authored: Fri Nov 18 21:53:39 2016 +0000 Committer: Jason Lowe <[email protected]> Committed: Fri Nov 18 21:53:39 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/dag/app/rm/TestTaskScheduler.java | 689 +------------------ .../dag/app/rm/TestTaskSchedulerHelpers.java | 12 +- 3 files changed, 43 insertions(+), 659 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/501a351d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index cac97f9..f13511d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3508. TestTaskScheduler cleanup. TEZ-3536. NPE in WebUIService start when host resolution fails. TEZ-3269. Provide basic fair routing and scheduling functionality via custom VertexManager and EdgeManager. TEZ-3534. Differentiate thread names on Fetchers, minor changes to shuffle shutdown code. http://git-wip-us.apache.org/repos/asf/tez/blob/501a351d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java index 5c8daeb..b3511e8 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java @@ -29,32 +29,23 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import org.apache.commons.io.IOExceptionWithCause; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -66,17 +57,17 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.util.RackResolver; import org.apache.tez.common.ContainerSignatureMatcher; import org.apache.tez.common.MockDNSToSwitchMapping; import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest; -import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.HeldContainer; -import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDrainable; -import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableContext; +import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientAsyncForTest; +import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientForTest; import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher; import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.PreemptionMatcher; +import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDrainable; +import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableContext; +import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest; +import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.HeldContainer; import org.apache.tez.serviceplugins.api.DagInfo; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus; @@ -91,6 +82,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; @SuppressWarnings("deprecation") public class TestTaskScheduler { @@ -127,10 +119,8 @@ public class TestTaskScheduler { @SuppressWarnings({ "unchecked" }) @Test(timeout=10000) public void testTaskSchedulerNoReuse() throws Exception { - RackResolver.init(new YarnConfiguration()); - - TezAMRMClientAsync<CookieContainerRequest> mockRMClient = - mock(TezAMRMClientAsync.class); + AMRMClientAsyncForTest mockRMClient = spy( + new AMRMClientAsyncForTest(new AMRMClientForTest(), 100)); String appHost = "host"; int appPort = 0; @@ -153,33 +143,16 @@ public class TestTaskScheduler { // it's the same instance. verify(mockRMClient).setHeartbeatInterval(interval); - RegisterApplicationMasterResponse mockRegResponse = - mock(RegisterApplicationMasterResponse.class); - Resource mockMaxResource = mock(Resource.class); - Map<ApplicationAccessType, String> mockAcls = mock(Map.class); - when(mockRegResponse.getMaximumResourceCapability()). - thenReturn(mockMaxResource); - when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls); - ByteBuffer mockKey = mock(ByteBuffer.class); - when(mockRegResponse.getClientToAMTokenMasterKey()).thenReturn(mockKey); - when(mockRMClient. - registerApplicationMaster(anyString(), anyInt(), anyString())). - thenReturn(mockRegResponse); scheduler.start(); drainableAppCallback.drain(); verify(mockRMClient).start(); verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl); - verify(mockApp).setApplicationRegistrationData(mockMaxResource, - mockAcls, mockKey); - - when(mockRMClient.getClusterNodeCount()).thenReturn(5); - Assert.assertEquals(5, scheduler.getClusterNodeCount()); + RegisterApplicationMasterResponse regResponse = mockRMClient.getRegistrationResponse(); + verify(mockApp).setApplicationRegistrationData(regResponse.getMaximumResourceCapability(), + regResponse.getApplicationACLs(), + regResponse.getClientToAMTokenMasterKey()); - Resource mockClusterResource = mock(Resource.class); - when(mockRMClient.getAvailableResources()). - thenReturn(mockClusterResource); - Assert.assertEquals(mockClusterResource, - mockRMClient.getAvailableResources()); + Assert.assertEquals(scheduler.getClusterNodeCount(), mockRMClient.getClusterNodeCount()); Object mockTask1 = mock(Object.class); Object mockCookie1 = mock(Object.class); @@ -257,84 +230,6 @@ public class TestTaskScheduler { ContainerId mockCId4 = mock(ContainerId.class); when(mockContainer4.getId()).thenReturn(mockCId4); containers.add(mockContainer4); - ArrayList<CookieContainerRequest> hostContainers = - new ArrayList<CookieContainerRequest>(); - hostContainers.add(request1); - hostContainers.add(request2); - hostContainers.add(request3); - ArrayList<CookieContainerRequest> rackContainers = - new ArrayList<CookieContainerRequest>(); - rackContainers.add(request2); - rackContainers.add(request3); - ArrayList<CookieContainerRequest> anyContainers = - new ArrayList<CookieContainerRequest>(); - anyContainers.add(request3); - - final List<ArrayList<CookieContainerRequest>> hostList = - new LinkedList<ArrayList<CookieContainerRequest>>(); - hostList.add(hostContainers); - final List<ArrayList<CookieContainerRequest>> rackList = - new LinkedList<ArrayList<CookieContainerRequest>>(); - rackList.add(rackContainers); - final List<ArrayList<CookieContainerRequest>> anyList = - new LinkedList<ArrayList<CookieContainerRequest>>(); - anyList.add(anyContainers); - final List<ArrayList<CookieContainerRequest>> emptyList = - new LinkedList<ArrayList<CookieContainerRequest>>(); - // return all requests for host1 - when( - mockRMClient.getMatchingRequests((Priority) any(), eq("host1"), - (Resource) any())).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return hostList; - } - - }); - // first request matched by host - // second request matched to rack. RackResolver by default puts hosts in - // /default-rack. We need to workaround by returning rack matches only once - when( - mockRMClient.getMatchingRequests((Priority) any(), eq("/default-rack"), - (Resource) any())).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return rackList; - } - - }).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return emptyList; - } - - }); - // third request matched to ANY - when( - mockRMClient.getMatchingRequests((Priority) any(), - eq(ResourceRequest.ANY), (Resource) any())).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return anyList; - } - - }).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return emptyList; - } - - }); scheduler.onContainersAllocated(containers); drainableAppCallback.drain(); // first container allocated @@ -398,9 +293,6 @@ public class TestTaskScheduler { null, mockPriority, null, mockCookie4); drainableAppCallback.drain(); verify(mockRMClient, times(5)).addContainerRequest(requestCaptor.capture()); - CookieContainerRequest request4 = requestCaptor.getValue(); - anyContainers.clear(); - anyContainers.add(request4); Container mockContainer5 = mock(Container.class, RETURNS_DEEP_STUBS); when(mockContainer5.getNodeId().getHost()).thenReturn(badHost); when(mockContainer5.getNodeId()).thenReturn(badNodeId); @@ -408,25 +300,6 @@ public class TestTaskScheduler { when(mockContainer5.getId()).thenReturn(mockCId5); containers.clear(); containers.add(mockContainer5); - when( - mockRMClient.getMatchingRequests((Priority) any(), - eq(ResourceRequest.ANY), (Resource) any())).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return anyList; - } - - }).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return emptyList; - } - - }); scheduler.onContainersAllocated(containers); drainableAppCallback.drain(); // no new allocation @@ -436,34 +309,12 @@ public class TestTaskScheduler { verify(mockRMClient, times(4)).releaseAssignedContainer((ContainerId) any()); // verify request added back verify(mockRMClient, times(6)).addContainerRequest(requestCaptor.capture()); - CookieContainerRequest request5 = requestCaptor.getValue(); - anyContainers.clear(); - anyContainers.add(request5); Container mockContainer6 = mock(Container.class, RETURNS_DEEP_STUBS); when(mockContainer6.getNodeId().getHost()).thenReturn("host7"); ContainerId mockCId6 = mock(ContainerId.class); when(mockContainer6.getId()).thenReturn(mockCId6); containers.clear(); containers.add(mockContainer6); - when( - mockRMClient.getMatchingRequests((Priority) any(), - eq(ResourceRequest.ANY), (Resource) any())).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return anyList; - } - - }).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return emptyList; - } - - }); scheduler.onContainersAllocated(containers); drainableAppCallback.drain(); // new allocation @@ -532,11 +383,8 @@ public class TestTaskScheduler { verify(mockRMClient).stop(); } - @SuppressWarnings({ "unchecked" }) @Test(timeout=10000) public void testTaskSchedulerInitiateStop() throws Exception { - RackResolver.init(new YarnConfiguration()); - String appHost = "host"; int appPort = 0; String appUrl = "url"; @@ -550,8 +398,8 @@ public class TestTaskScheduler { TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf); final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp); - TezAMRMClientAsync<CookieContainerRequest> mockRMClient = - mock(TezAMRMClientAsync.class); + TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy( + new AMRMClientAsyncForTest(new AMRMClientForTest(), 100)); TaskSchedulerWithDrainableContext scheduler = new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient); @@ -559,20 +407,6 @@ public class TestTaskScheduler { scheduler.initialize(); drainableAppCallback.drain(); - RegisterApplicationMasterResponse mockRegResponse = - mock(RegisterApplicationMasterResponse.class); - Resource mockMaxResource = mock(Resource.class); - Map<ApplicationAccessType, String> mockAcls = mock(Map.class); - when(mockRegResponse.getMaximumResourceCapability()). - thenReturn(mockMaxResource); - when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls); - when(mockRMClient. - registerApplicationMaster(anyString(), anyInt(), anyString())). - thenReturn(mockRegResponse); - Resource mockClusterResource = mock(Resource.class); - when(mockRMClient.getAvailableResources()). - thenReturn(mockClusterResource); - scheduler.start(); drainableAppCallback.drain(); @@ -585,8 +419,6 @@ public class TestTaskScheduler { final Priority mockPriority1 = Priority.newInstance(1); final Priority mockPriority2 = Priority.newInstance(2); final Priority mockPriority3 = Priority.newInstance(3); - final Priority mockPriority4 = Priority.newInstance(4); - final Priority mockPriority5 = Priority.newInstance(5); Object mockTask2 = mock(Object.class); when(mockTask2.toString()).thenReturn("task2"); Object mockCookie2 = mock(Object.class); @@ -644,93 +476,6 @@ public class TestTaskScheduler { new ArrayList<CookieContainerRequest>(); anyContainers.add(request3); - final List<ArrayList<CookieContainerRequest>> hostList = - new LinkedList<ArrayList<CookieContainerRequest>>(); - hostList.add(hostContainers); - final List<ArrayList<CookieContainerRequest>> rackList = - new LinkedList<ArrayList<CookieContainerRequest>>(); - rackList.add(rackContainers); - final List<ArrayList<CookieContainerRequest>> anyList = - new LinkedList<ArrayList<CookieContainerRequest>>(); - anyList.add(anyContainers); - final List<ArrayList<CookieContainerRequest>> emptyList = - new LinkedList<ArrayList<CookieContainerRequest>>(); - // return pri1 requests for host1 - when( - mockRMClient.getMatchingRequestsForTopPriority(eq("host1"), - (Resource) any())).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return hostList; - } - - }); - // second request matched to rack. RackResolver by default puts hosts in - // /default-rack. We need to workaround by returning rack matches only once - when( - mockRMClient.getMatchingRequestsForTopPriority(eq("/default-rack"), - (Resource) any())).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return rackList; - } - - }).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return emptyList; - } - - }); - // third request matched to ANY - when( - mockRMClient.getMatchingRequestsForTopPriority( - eq(ResourceRequest.ANY), (Resource) any())).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return anyList; - } - - }).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return emptyList; - } - - }); - - when(mockRMClient.getTopPriority()).then( - new Answer<Priority>() { - @Override - public Priority answer( - InvocationOnMock invocation) throws Throwable { - int allocations = drainableAppCallback.count.get(); - if (allocations == 0) { - return mockPriority1; - } - if (allocations == 1) { - return mockPriority2; - } - if (allocations == 2) { - return mockPriority3; - } - if (allocations == 3) { - return mockPriority4; - } - return null; - } - }); - AtomicBoolean drainNotifier = new AtomicBoolean(false); scheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; @@ -757,10 +502,8 @@ public class TestTaskScheduler { @SuppressWarnings({ "unchecked" }) @Test(timeout=10000) public void testTaskSchedulerWithReuse() throws Exception { - RackResolver.init(new YarnConfiguration()); - - TezAMRMClientAsync<CookieContainerRequest> mockRMClient = - mock(TezAMRMClientAsync.class); + TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy( + new AMRMClientAsyncForTest(new AMRMClientForTest(), 100)); String appHost = "host"; int appPort = 0; @@ -782,21 +525,6 @@ public class TestTaskScheduler { scheduler.initialize(); drainableAppCallback.drain(); - - RegisterApplicationMasterResponse mockRegResponse = - mock(RegisterApplicationMasterResponse.class); - Resource mockMaxResource = mock(Resource.class); - Map<ApplicationAccessType, String> mockAcls = mock(Map.class); - when(mockRegResponse.getMaximumResourceCapability()). - thenReturn(mockMaxResource); - when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls); - when(mockRMClient. - registerApplicationMaster(anyString(), anyInt(), anyString())). - thenReturn(mockRegResponse); - Resource mockClusterResource = mock(Resource.class); - when(mockRMClient.getAvailableResources()). - thenReturn(mockClusterResource); - scheduler.start(); drainableAppCallback.drain(); @@ -874,103 +602,6 @@ public class TestTaskScheduler { when(mockCId3.toString()).thenReturn("container3"); containers.add(mockContainer3); - ArrayList<CookieContainerRequest> hostContainers = - new ArrayList<CookieContainerRequest>(); - hostContainers.add(request1); - ArrayList<CookieContainerRequest> rackContainers = - new ArrayList<CookieContainerRequest>(); - rackContainers.add(request2); - ArrayList<CookieContainerRequest> anyContainers = - new ArrayList<CookieContainerRequest>(); - anyContainers.add(request3); - - final List<ArrayList<CookieContainerRequest>> hostList = - new LinkedList<ArrayList<CookieContainerRequest>>(); - hostList.add(hostContainers); - final List<ArrayList<CookieContainerRequest>> rackList = - new LinkedList<ArrayList<CookieContainerRequest>>(); - rackList.add(rackContainers); - final List<ArrayList<CookieContainerRequest>> anyList = - new LinkedList<ArrayList<CookieContainerRequest>>(); - anyList.add(anyContainers); - final List<ArrayList<CookieContainerRequest>> emptyList = - new LinkedList<ArrayList<CookieContainerRequest>>(); - // return pri1 requests for host1 - when( - mockRMClient.getMatchingRequestsForTopPriority(eq("host1"), - (Resource) any())).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return hostList; - } - - }); - // second request matched to rack. RackResolver by default puts hosts in - // /default-rack. We need to workaround by returning rack matches only once - when( - mockRMClient.getMatchingRequestsForTopPriority(eq("/default-rack"), - (Resource) any())).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return rackList; - } - - }).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return emptyList; - } - - }); - // third request matched to ANY - when( - mockRMClient.getMatchingRequestsForTopPriority( - eq(ResourceRequest.ANY), (Resource) any())).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return anyList; - } - - }).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return emptyList; - } - - }); - - when(mockRMClient.getTopPriority()).then( - new Answer<Priority>() { - @Override - public Priority answer( - InvocationOnMock invocation) throws Throwable { - int allocations = drainableAppCallback.count.get(); - if (allocations == 0) { - return mockPriority1; - } - if (allocations == 1) { - return mockPriority2; - } - if (allocations == 2) { - return mockPriority3; - } - if (allocations == 3) { - return mockPriority4; - } - return null; - } - }); - AtomicBoolean drainNotifier = new AtomicBoolean(false); scheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; @@ -1039,9 +670,6 @@ public class TestTaskScheduler { null, mockPriority4, null, mockCookie4); drainableAppCallback.drain(); verify(mockRMClient, times(4)).addContainerRequest(requestCaptor.capture()); - CookieContainerRequest request4 = requestCaptor.getValue(); - anyContainers.clear(); - anyContainers.add(request4); Container mockContainer5 = mock(Container.class, RETURNS_DEEP_STUBS); when(mockContainer5.getNodeId().getHost()).thenReturn(badHost); when(mockContainer5.getNodeId()).thenReturn(badNodeId); @@ -1052,25 +680,6 @@ public class TestTaskScheduler { when(mockContainer5.getPriority()).thenReturn(mockPriority4); containers.clear(); containers.add(mockContainer5); - when( - mockRMClient.getMatchingRequestsForTopPriority( - eq(ResourceRequest.ANY), (Resource) any())).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return anyList; - } - - }).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return emptyList; - } - - }); drainNotifier.set(false); scheduler.onContainersAllocated(containers); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); @@ -1082,9 +691,6 @@ public class TestTaskScheduler { verify(mockRMClient, times(4)).releaseAssignedContainer((ContainerId) any()); // verify request added back verify(mockRMClient, times(5)).addContainerRequest(requestCaptor.capture()); - CookieContainerRequest request5 = requestCaptor.getValue(); - anyContainers.clear(); - anyContainers.add(request5); Container mockContainer6 = mock(Container.class, RETURNS_DEEP_STUBS); when(mockContainer6.getNodeId().getHost()).thenReturn("host7"); ContainerId mockCId6 = mock(ContainerId.class); @@ -1093,25 +699,6 @@ public class TestTaskScheduler { when(mockCId6.toString()).thenReturn("container6"); containers.clear(); containers.add(mockContainer6); - when( - mockRMClient.getMatchingRequestsForTopPriority( - eq(ResourceRequest.ANY), (Resource) any())).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return anyList; - } - - }).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return emptyList; - } - - }); drainNotifier.set(false); scheduler.onContainersAllocated(containers); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); @@ -1131,21 +718,6 @@ public class TestTaskScheduler { assertEquals(0, scheduler.blacklistedNodes.size()); // verify container level matching - // fudge the top level priority to prevent containers from being released - // if top level priority is higher than newly allocated containers then - // they will not be released - final AtomicBoolean fudgePriority = new AtomicBoolean(true); - when(mockRMClient.getTopPriority()).then( - new Answer<Priority>() { - @Override - public Priority answer( - InvocationOnMock invocation) throws Throwable { - if (fudgePriority.get()) { - return mockPriority4; - } - return mockPriority5; - } - }); // add a dummy task to prevent release of allocated containers Object mockTask5 = mock(Object.class); when(mockTask5.toString()).thenReturn("task5"); @@ -1153,7 +725,6 @@ public class TestTaskScheduler { scheduler.allocateTask(mockTask5, mockCapability, hosts, racks, mockPriority5, null, mockCookie5); verify(mockRMClient, times(6)).addContainerRequest(requestCaptor.capture()); - CookieContainerRequest request6 = requestCaptor.getValue(); drainableAppCallback.drain(); // add containers so that we can reference one of them for container specific // allocation @@ -1185,24 +756,6 @@ public class TestTaskScheduler { scheduler.allocateTask(mockTask6, mockCapability, mockCId7, mockPriority5, null, mockCookie6); drainableAppCallback.drain(); verify(mockRMClient, times(7)).addContainerRequest(requestCaptor.capture()); - CookieContainerRequest request7 = requestCaptor.getValue(); - hostContainers.clear(); - hostContainers.add(request6); - hostContainers.add(request7); - - when( - mockRMClient.getMatchingRequestsForTopPriority(eq("host5"), - (Resource) any())).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return hostList; - } - - }); - // stop fudging top priority - fudgePriority.set(false); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); verify(mockApp, times(6)).taskAllocated(any(), any(), (Container) any()); @@ -1253,13 +806,10 @@ public class TestTaskScheduler { verify(mockRMClient).stop(); } - @SuppressWarnings("unchecked") @Test (timeout=5000) public void testTaskSchedulerDetermineMinHeldContainers() throws Exception { - RackResolver.init(new YarnConfiguration()); - - TezAMRMClientAsync<CookieContainerRequest> mockRMClient = - mock(TezAMRMClientAsync.class); + TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy( + new AMRMClientAsyncForTest(new AMRMClientForTest(), 100)); String appHost = "host"; int appPort = 0; @@ -1273,18 +823,6 @@ public class TestTaskScheduler { new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient); scheduler.initialize(); - RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class); - Resource mockMaxResource = mock(Resource.class); - Map<ApplicationAccessType, String> mockAcls = mock(Map.class); - when(mockRegResponse.getMaximumResourceCapability()).thenReturn( - mockMaxResource); - when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls); - when( - mockRMClient.registerApplicationMaster(anyString(), anyInt(), - anyString())).thenReturn(mockRegResponse); - Resource mockClusterResource = mock(Resource.class); - when(mockRMClient.getAvailableResources()).thenReturn(mockClusterResource); - scheduler.start(); String rack1 = "r1"; @@ -1427,13 +965,10 @@ public class TestTaskScheduler { scheduler.shutdown(); } - @SuppressWarnings("unchecked") @Test(timeout=5000) public void testTaskSchedulerRandomReuseExpireTime() throws Exception { - RackResolver.init(new YarnConfiguration()); - - TezAMRMClientAsync<CookieContainerRequest> mockRMClient = - mock(TezAMRMClientAsync.class); + TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy( + new AMRMClientAsyncForTest(new AMRMClientForTest(), 100)); String appHost = "host"; int appPort = 0; @@ -1463,21 +998,6 @@ public class TestTaskScheduler { scheduler1.initialize(); scheduler2.initialize(); - - RegisterApplicationMasterResponse mockRegResponse = - mock(RegisterApplicationMasterResponse.class); - Resource mockMaxResource = mock(Resource.class); - Map<ApplicationAccessType, String> mockAcls = mock(Map.class); - when(mockRegResponse.getMaximumResourceCapability()). - thenReturn(mockMaxResource); - when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls); - when(mockRMClient. - registerApplicationMaster(anyString(), anyInt(), anyString())). - thenReturn(mockRegResponse); - Resource mockClusterResource = mock(Resource.class); - when(mockRMClient.getAvailableResources()). - thenReturn(mockClusterResource); - scheduler1.start(); scheduler2.start(); @@ -1509,8 +1029,6 @@ public class TestTaskScheduler { @SuppressWarnings({ "unchecked", "rawtypes" }) @Test (timeout=5000) public void testTaskSchedulerPreemption() throws Exception { - RackResolver.init(new YarnConfiguration()); - TezAMRMClientAsync<CookieContainerRequest> mockRMClient = mock(TezAMRMClientAsync.class); @@ -1818,13 +1336,10 @@ public class TestTaskScheduler { drainableAppCallback.drain(); } - @SuppressWarnings({ "unchecked", "rawtypes" }) @Test (timeout=5000) public void testTaskSchedulerPreemption2() throws Exception { - RackResolver.init(new YarnConfiguration()); - - TezAMRMClientAsync<CookieContainerRequest> mockRMClient = - mock(TezAMRMClientAsync.class); + TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy( + new AMRMClientAsyncForTest(new AMRMClientForTest(), 100)); String appHost = "host"; int appPort = 0; @@ -1845,20 +1360,12 @@ public class TestTaskScheduler { new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient); scheduler.initialize(); - - RegisterApplicationMasterResponse mockRegResponse = - mock(RegisterApplicationMasterResponse.class); - when( - mockRMClient.registerApplicationMaster(anyString(), anyInt(), - anyString())).thenReturn(mockRegResponse); - scheduler.start(); - Resource totalResource = Resource.newInstance(4000, 4); // high value always reported - when(mockRMClient.getAvailableResources()).thenReturn(totalResource); // no preemption scheduler.getProgress(); drainableAppCallback.drain(); + Resource totalResource = mockRMClient.getAvailableResources(); Assert.assertEquals(totalResource, scheduler.getTotalResources()); verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any()); @@ -1890,12 +1397,6 @@ public class TestTaskScheduler { Assert.assertEquals(totalResource, scheduler.getTotalResources()); verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any()); - final List<ArrayList<CookieContainerRequest>> anyList = - new LinkedList<ArrayList<CookieContainerRequest>>(); - final List<ArrayList<CookieContainerRequest>> emptyList = - new LinkedList<ArrayList<CookieContainerRequest>>(); - - anyList.add(anyContainers); List<Container> containers = new ArrayList<Container>(); Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS); when(mockContainer1.getNodeId().getHost()).thenReturn("host1"); @@ -1904,44 +1405,8 @@ public class TestTaskScheduler { ContainerId mockCId1 = mock(ContainerId.class); when(mockContainer1.getId()).thenReturn(mockCId1); containers.add(mockContainer1); - when( - mockRMClient.getMatchingRequests((Priority) any(), eq("host1"), - (Resource) any())).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return emptyList; - } - }); - // RackResolver by default puts hosts in default-rack - when( - mockRMClient.getMatchingRequests((Priority) any(), eq("/default-rack"), - (Resource) any())).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return emptyList; - } - }); - when( - mockRMClient.getMatchingRequests((Priority) any(), - eq(ResourceRequest.ANY), (Resource) any())).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - int calls = 0; - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - if(calls > 0) { - anyContainers.remove(0); - } - calls++; - return anyList; - } - }); - Mockito.doAnswer(new Answer() { + Mockito.doAnswer(new Answer<Object>() { public Object answer(InvocationOnMock invocation) { Object[] args = invocation.getArguments(); ContainerId cId = (ContainerId) args[0]; @@ -2031,13 +1496,10 @@ public class TestTaskScheduler { drainableAppCallback.drain(); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testLocalityMatching() throws Exception { - - RackResolver.init(new Configuration()); - TezAMRMClientAsync<CookieContainerRequest> amrmClient = - mock(TezAMRMClientAsync.class); + TezAMRMClientAsync<CookieContainerRequest> amrmClient = spy( + new AMRMClientAsyncForTest(new AMRMClientForTest(), 100)); Configuration conf = new Configuration(); conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false); @@ -2049,16 +1511,6 @@ public class TestTaskScheduler { new TaskSchedulerWithDrainableContext(drainableAppCallback, amrmClient); taskScheduler.initialize(); - - RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class); - Resource mockMaxResource = mock(Resource.class); - Map<ApplicationAccessType, String> mockAcls = mock(Map.class); - when(mockRegResponse.getMaximumResourceCapability()).thenReturn( - mockMaxResource); - when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls); - when(amrmClient.registerApplicationMaster(anyString(), anyInt(), anyString())) - .thenReturn(mockRegResponse); - taskScheduler.start(); Resource resource = Resource.newInstance(1024, 1); @@ -2087,7 +1539,6 @@ public class TestTaskScheduler { allocatedContainers.add(containerHost3); allocatedContainers.add(containerHost1); - final Map<String, List<CookieContainerRequest>> matchingMap = new HashMap<String, List<CookieContainerRequest>>(); taskScheduler.allocateTask(mockTask1, resource, hostsTask1, defaultRack, priority, null, mockCookie1); drainableAppCallback.drain(); @@ -2096,8 +1547,6 @@ public class TestTaskScheduler { host1List.add(mockCookie1); List<CookieContainerRequest> defaultRackList = new ArrayList<CookieContainerRequest>(); defaultRackList.add(mockCookie1); - matchingMap.put(hostsTask1[0], host1List); - matchingMap.put(defaultRack[0], defaultRackList); List<CookieContainerRequest> nonAllocatedHostList = new ArrayList<YarnTaskSchedulerService.CookieContainerRequest>(); nonAllocatedHostList.add(mockCookie2); @@ -2106,48 +1555,11 @@ public class TestTaskScheduler { taskScheduler.allocateTask(mockTask2, resource, hostsTask2, otherRack, priority, null, mockCookie2); drainableAppCallback.drain(); - matchingMap.put(hostsTask2[0], nonAllocatedHostList); - matchingMap.put(otherRack[0], otherRackList); List<CookieContainerRequest> anyList = new LinkedList<YarnTaskSchedulerService.CookieContainerRequest>(); anyList.add(mockCookie1); anyList.add(mockCookie2); - matchingMap.put(ResourceRequest.ANY, anyList); - - final List<ArrayList<CookieContainerRequest>> emptyList = new LinkedList<ArrayList<CookieContainerRequest>>(); - - when( - amrmClient.getMatchingRequests((Priority) any(), anyString(), - (Resource) any())).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - String location = (String) invocation.getArguments()[1]; - if (matchingMap.get(location) != null) { - CookieContainerRequest matched = matchingMap.get(location).get(0); - // Remove matched from matchingMap - assuming TaskScheduler will - // pick the first entry. - Iterator<Entry<String, List<CookieContainerRequest>>> iter = matchingMap - .entrySet().iterator(); - while (iter.hasNext()) { - Entry<String, List<CookieContainerRequest>> entry = iter.next(); - if (entry.getValue().remove(matched)) { - if (entry.getValue().size() == 0) { - iter.remove(); - } - } - } - return Collections.singletonList(Collections - .singletonList(matched)); - } else { - return emptyList; - } - } - }); - taskScheduler.onContainersAllocated(allocatedContainers); drainableAppCallback.drain(); @@ -2193,13 +1605,10 @@ public class TestTaskScheduler { Assert.assertEquals(1, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(1, 1)); } - @SuppressWarnings("unchecked") @Test public void testContainerExpired() throws Exception { - RackResolver.init(new YarnConfiguration()); - - TezAMRMClientAsync<CookieContainerRequest> mockRMClient = - mock(TezAMRMClientAsync.class); + TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy( + new AMRMClientAsyncForTest(new AMRMClientForTest(), 100)); String appHost = "host"; int appPort = 0; @@ -2218,22 +1627,6 @@ public class TestTaskScheduler { new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient); scheduler.initialize(); - drainableAppCallback.drain(); - - RegisterApplicationMasterResponse mockRegResponse = - mock(RegisterApplicationMasterResponse.class); - Resource mockMaxResource = mock(Resource.class); - Map<ApplicationAccessType, String> mockAcls = mock(Map.class); - when(mockRegResponse.getMaximumResourceCapability()). - thenReturn(mockMaxResource); - when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls); - when(mockRMClient. - registerApplicationMaster(anyString(), anyInt(), anyString())). - thenReturn(mockRegResponse); - Resource mockClusterResource = mock(Resource.class); - when(mockRMClient.getAvailableResources()). - thenReturn(mockClusterResource); - scheduler.start(); drainableAppCallback.drain(); @@ -2263,7 +1656,6 @@ public class TestTaskScheduler { drainableAppCallback.drain(); verify(mockRMClient, times(2)). addContainerRequest(requestCaptor.capture()); - CookieContainerRequest request1 = requestCaptor.getValue(); List<Container> containers = new ArrayList<Container>(); // sending only lower priority container to make sure its not matched @@ -2275,25 +1667,6 @@ public class TestTaskScheduler { when(mockContainer2.getId()).thenReturn(mockCId2); when(mockCId2.toString()).thenReturn("container2"); containers.add(mockContainer2); - ArrayList<CookieContainerRequest> hostContainers = - new ArrayList<CookieContainerRequest>(); - hostContainers.add(request1); - final List<ArrayList<CookieContainerRequest>> hostList = - new LinkedList<ArrayList<CookieContainerRequest>>(); - hostList.add(hostContainers); - - when( - mockRMClient.getMatchingRequestsForTopPriority(eq("host1"), - (Resource) any())).thenAnswer( - new Answer<List<? extends Collection<CookieContainerRequest>>>() { - @Override - public List<? extends Collection<CookieContainerRequest>> answer( - InvocationOnMock invocation) throws Throwable { - return hostList; - } - - }); - when(mockRMClient.getTopPriority()).thenReturn(mockPriority1); scheduler.onContainersAllocated(containers); http://git-wip-us.apache.org/repos/asf/tez/blob/501a351d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index d8170e3..9a845a1 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -79,6 +79,11 @@ class TestTaskSchedulerHelpers { // Mocking AMRMClientImpl to make use of getMatchingRequest static class AMRMClientForTest extends AMRMClientImpl<CookieContainerRequest> { + AMRMClientForTest() { + super(); + this.clusterAvailableResources = Resource.newInstance(4000, 4); + this.clusterNodeCount = 5; + } @Override protected void serviceStart() { @@ -93,6 +98,7 @@ class TestTaskSchedulerHelpers { // Mocking AMRMClientAsyncImpl to make use of getMatchingRequest static class AMRMClientAsyncForTest extends TezAMRMClientAsync<CookieContainerRequest> { + private RegisterApplicationMasterResponse mockRegResponse; public AMRMClientAsyncForTest( AMRMClient<CookieContainerRequest> client, @@ -105,7 +111,7 @@ class TestTaskSchedulerHelpers { @Override public RegisterApplicationMasterResponse registerApplicationMaster( String appHostName, int appHostPort, String appTrackingUrl) { - RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class); + mockRegResponse = mock(RegisterApplicationMasterResponse.class); Resource mockMaxResource = mock(Resource.class); Map<ApplicationAccessType, String> mockAcls = mock(Map.class); when(mockRegResponse.getMaximumResourceCapability()).thenReturn( @@ -126,6 +132,10 @@ class TestTaskSchedulerHelpers { @Override protected void serviceStop() { } + + RegisterApplicationMasterResponse getRegistrationResponse() { + return mockRegResponse; + } } // Overrides start / stop. Will be controlled without the extra event handling thread.
