Repository: tez Updated Branches: refs/heads/master 823b1bb3b -> b04e7fce7
TEZ-3431. Add unit tests for container release (Taklon Stephen Wu via zhiyuany) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b04e7fce Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b04e7fce Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b04e7fce Branch: refs/heads/master Commit: b04e7fce7ff61bb31b06919da7298aa3a04e1c5f Parents: 823b1bb Author: Zhiyuan Yang <[email protected]> Authored: Fri Aug 25 16:47:01 2017 -0700 Committer: Zhiyuan Yang <[email protected]> Committed: Fri Aug 25 16:47:01 2017 -0700 ---------------------------------------------------------------------- .../tez/dag/app/rm/TestTaskScheduler.java | 123 ++++++++++--------- 1 file changed, 66 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/b04e7fce/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java index 16c560e..1a647b1 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java @@ -89,6 +89,10 @@ public class TestTaskScheduler { static ContainerSignatureMatcher containerSignatureMatcher = new AlwaysMatchesContainerMatcher(); private ExecutorService contextCallbackExecutor; + private static final String DEFAULT_APP_HOST = "host"; + private static final String DEFAULT_APP_URL = "url"; + private static final String SUCCEED_APP_MESSAGE = "success"; + private static final int DEFAULT_APP_PORT = 0; @BeforeClass public static void beforeClass() { @@ -122,16 +126,12 @@ public class TestTaskScheduler { AMRMClientAsyncForTest mockRMClient = spy( new AMRMClientAsyncForTest(new AMRMClientForTest(), 100)); - String appHost = "host"; - int appPort = 0; - String appUrl = "url"; - Configuration conf = new Configuration(); conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false); int interval = 100; conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, interval); - TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf); + TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, conf); TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp); TaskSchedulerWithDrainableContext scheduler = @@ -146,7 +146,7 @@ public class TestTaskScheduler { scheduler.start(); drainableAppCallback.drain(); verify(mockRMClient).start(); - verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl); + verify(mockRMClient).registerApplicationMaster(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL); RegisterApplicationMasterResponse regResponse = mockRMClient.getRegistrationResponse(); verify(mockApp).setApplicationRegistrationData(regResponse.getMaximumResourceCapability(), regResponse.getApplicationACLs(), @@ -372,23 +372,19 @@ public class TestTaskScheduler { drainableAppCallback.drain(); verify(mockApp).appShutdownRequested(); - String appMsg = "success"; AppFinalStatus finalStatus = - new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl); + new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, SUCCEED_APP_MESSAGE, DEFAULT_APP_URL); when(mockApp.getFinalAppStatus()).thenReturn(finalStatus); scheduler.shutdown(); drainableAppCallback.drain(); verify(mockRMClient). unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, - appMsg, appUrl); + SUCCEED_APP_MESSAGE, DEFAULT_APP_URL); verify(mockRMClient).stop(); } @Test(timeout=10000) public void testTaskSchedulerInitiateStop() throws Exception { - String appHost = "host"; - int appPort = 0; - String appUrl = "url"; Configuration conf = new Configuration(); conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); @@ -396,7 +392,7 @@ public class TestTaskScheduler { conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 10000); conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 10000); - TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf); + TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, conf); final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp); TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy( @@ -506,10 +502,6 @@ public class TestTaskScheduler { TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy( new AMRMClientAsyncForTest(new AMRMClientForTest(), 100)); - String appHost = "host"; - int appPort = 0; - String appUrl = "url"; - Configuration conf = new Configuration(); // to match all in the same pass conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); @@ -517,7 +509,7 @@ public class TestTaskScheduler { conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); - TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf); + TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, conf); final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp); TaskSchedulerWithDrainableContext scheduler = @@ -795,15 +787,14 @@ public class TestTaskScheduler { drainableAppCallback.drain(); verify(mockApp).appShutdownRequested(); - String appMsg = "success"; AppFinalStatus finalStatus = - new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl); + new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, SUCCEED_APP_MESSAGE, DEFAULT_APP_URL); when(mockApp.getFinalAppStatus()).thenReturn(finalStatus); scheduler.shutdown(); drainableAppCallback.drain(); verify(mockRMClient). unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, - appMsg, appUrl); + SUCCEED_APP_MESSAGE, DEFAULT_APP_URL); verify(mockRMClient).stop(); } @@ -812,12 +803,8 @@ public class TestTaskScheduler { TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy( new AMRMClientAsyncForTest(new AMRMClientForTest(), 100)); - String appHost = "host"; - int appPort = 0; - String appUrl = "url"; - - TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, true, - new Configuration()); + TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, + true, new Configuration()); final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp); TaskSchedulerWithDrainableContext scheduler = @@ -959,9 +946,48 @@ public class TestTaskScheduler { verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)any()); Assert.assertEquals(5, scheduler.heldContainers.size()); - String appMsg = "success"; AppFinalStatus finalStatus = - new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl); + new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, SUCCEED_APP_MESSAGE, DEFAULT_APP_URL); + when(mockApp.getFinalAppStatus()).thenReturn(finalStatus); + scheduler.shutdown(); + } + + @Test (timeout=3000) + public void testTaskSchedulerHeldContainersReleaseAfterExpired() throws Exception { + final TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy( + new AMRMClientAsyncForTest(new AMRMClientForTest(), 100)); + final TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, + DEFAULT_APP_URL, true, new Configuration()); + final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp); + final TaskSchedulerWithDrainableContext scheduler = + new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient); + + scheduler.initialize(); + scheduler.start(); + + ApplicationAttemptId appId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0); + ContainerId containerId = ContainerId.newInstance(appId, 0); + Container c1 = mock(Container.class, RETURNS_DEEP_STUBS); + when(c1.getNodeId().getHost()).thenReturn(""); // we are mocking directly + + HeldContainer hc1 = mock(HeldContainer.class); + when(c1.getId()).thenReturn(containerId); + when(hc1.getContainer()).thenReturn(c1); + when(hc1.isNew()).thenReturn(false); + + // containerExpiryTime = 0 + scheduler.heldContainers.put(containerId, hc1); + + long currTime = System.currentTimeMillis(); + scheduler.delayedContainerManager.addDelayedContainer(hc1.getContainer(), currTime); + // sleep and wait for mainLoop() check-in to release this expired held container. + Thread.sleep(1000); + + verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any()); + Assert.assertEquals(0, scheduler.heldContainers.size()); + + AppFinalStatus finalStatus = + new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, SUCCEED_APP_MESSAGE, DEFAULT_APP_URL); when(mockApp.getFinalAppStatus()).thenReturn(finalStatus); scheduler.shutdown(); } @@ -971,10 +997,6 @@ public class TestTaskScheduler { TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy( new AMRMClientAsyncForTest(new AMRMClientForTest(), 100)); - String appHost = "host"; - int appPort = 0; - String appUrl = "url"; - long minTime = 1000l; long maxTime = 100000l; Configuration conf1 = new Configuration(); @@ -985,8 +1007,8 @@ public class TestTaskScheduler { conf2.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, minTime); conf2.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, maxTime); - TaskSchedulerContext mockApp1 = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf1); - TaskSchedulerContext mockApp2 = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf2); + TaskSchedulerContext mockApp1 = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, conf1); + TaskSchedulerContext mockApp2 = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, conf2); final TaskSchedulerContextDrainable drainableAppCallback1 = createDrainableContext(mockApp1); final TaskSchedulerContextDrainable drainableAppCallback2 = createDrainableContext(mockApp2); @@ -1018,9 +1040,8 @@ public class TestTaskScheduler { lastExpireTime = currExpireTime; } - String appMsg = "success"; AppFinalStatus finalStatus = - new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl); + new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, SUCCEED_APP_MESSAGE, DEFAULT_APP_URL); when(mockApp1.getFinalAppStatus()).thenReturn(finalStatus); when(mockApp2.getFinalAppStatus()).thenReturn(finalStatus); scheduler1.shutdown(); @@ -1033,16 +1054,12 @@ public class TestTaskScheduler { TezAMRMClientAsync<CookieContainerRequest> mockRMClient = mock(TezAMRMClientAsync.class); - String appHost = "host"; - int appPort = 0; - String appUrl = "url"; - Configuration conf = new Configuration(); conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false); conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS, 3); - TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, false, - null, null, new PreemptionMatcher(), conf); + TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, + false, null, null, new PreemptionMatcher(), conf); final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp); final TaskSchedulerWithDrainableContext scheduler = @@ -1331,7 +1348,7 @@ public class TestTaskScheduler { verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId3A); AppFinalStatus finalStatus = - new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", appUrl); + new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", DEFAULT_APP_URL); when(mockApp.getFinalAppStatus()).thenReturn(finalStatus); scheduler.shutdown(); drainableAppCallback.drain(); @@ -1342,10 +1359,6 @@ public class TestTaskScheduler { TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy( new AMRMClientAsyncForTest(new AMRMClientForTest(), 100)); - String appHost = "host"; - int appPort = 0; - String appUrl = "url"; - int waitTime = 1000; Configuration conf = new Configuration(); @@ -1353,8 +1366,8 @@ public class TestTaskScheduler { conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS, 2); conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS, waitTime); - TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, false, - null, null, new PreemptionMatcher(), conf); + TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, + false, null, null, new PreemptionMatcher(), conf); final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp); final TaskSchedulerWithDrainableContext scheduler = @@ -1491,7 +1504,7 @@ public class TestTaskScheduler { Assert.assertTrue(oldStartWaitTime < scheduler.highestWaitingRequestWaitStartTime); AppFinalStatus finalStatus = - new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", appUrl); + new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", DEFAULT_APP_URL); when(mockApp.getFinalAppStatus()).thenReturn(finalStatus); scheduler.shutdown(); drainableAppCallback.drain(); @@ -1505,7 +1518,7 @@ public class TestTaskScheduler { Configuration conf = new Configuration(); conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false); - TaskSchedulerContext appClient = setupMockTaskSchedulerContext("host", 0, "", conf); + TaskSchedulerContext appClient = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, "", conf); final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(appClient); TaskSchedulerWithDrainableContext taskScheduler = @@ -1611,10 +1624,6 @@ public class TestTaskScheduler { TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy( new AMRMClientAsyncForTest(new AMRMClientForTest(), 100)); - String appHost = "host"; - int appPort = 0; - String appUrl = "url"; - Configuration conf = new Configuration(); // to match all in the same pass conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); @@ -1622,7 +1631,7 @@ public class TestTaskScheduler { conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); - TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf); + TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, conf); final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp); TaskSchedulerWithDrainableContext scheduler = new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
