Repository: tez Updated Branches: refs/heads/master a328d469d -> 2e121ec7e
TEZ-3247. Add more unit test coverage for container reuse. (Harish Jaiprakash via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2e121ec7 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2e121ec7 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2e121ec7 Branch: refs/heads/master Commit: 2e121ec7eac5c3723b419856e8d1f3289f4feae7 Parents: a328d46 Author: Hitesh Shah <[email protected]> Authored: Wed Nov 2 10:10:13 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Wed Nov 2 10:10:13 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/dag/app/rm/TestContainerReuse.java | 321 +++++++++++++++++++ 2 files changed, 322 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/2e121ec7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b4beb80..733a66e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3247. Add more unit test coverage for container reuse. TEZ-3215. Support for MultipleOutputs. TEZ-3097. Flaky test: TestCommit.testDAGCommitStartedEventFail_OnDAGSuccess. TEZ-3487. Improvements in travis yml file to get builds to work. http://git-wip-us.apache.org/repos/asf/tez/blob/2e121ec7/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index f21de3e..7e9e9ab 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -96,6 +96,7 @@ import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -1275,6 +1276,35 @@ public class TestContainerReuse { verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2)); eventHandler.reset(); + taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta211, container2.getId(), + TaskAttemptState.SUCCEEDED, null, null, 0)); + drainableAppCallback.drain(); + verifyDeAllocateTask(taskScheduler, ta211, true, null, null); + verify(rmClient, times(0)).releaseAssignedContainer(eq(container2.getId())); + eventHandler.reset(); + + + // Setup DAG3 with a subset of resources from DAG2 (container should be reused). + TezDAGID dagID3 = TezDAGID.getInstance("0", 3, 0); + dagIDAnswer.setDAGID(dagID3); + + Map<String, LocalResource> v31LR = Collections.singletonMap(rsrc1, lr1); + + // dag3, vertex1, task1, ta1 + TezTaskAttemptID taID311 = TezTaskAttemptID.getInstance( + TezTaskID.getInstance(TezVertexID.getInstance(dagID3, 1), 1), 1); + TaskAttempt ta311 = mock(TaskAttempt.class); + doReturn(taID311).when(ta311).getID(); + doReturn("Mock for TA " + taID311).when(ta311).toString(); + AMSchedulerEventTALaunchRequest lrEvent31 = createLaunchRequestEvent(taID311, ta311, resource1, + host1, racks, priority1, v31LR); + + taskSchedulerManager.handleEvent(lrEvent31); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + drainableAppCallback.drain(); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta311), any(Object.class), eq(container2)); + eventHandler.reset(); + taskScheduler.shutdown(); taskSchedulerManager.close(); } @@ -1348,6 +1378,289 @@ public class TestContainerReuse { taskSchedulerManager.close(); } + @Test(timeout=5000) + public void testDifferentResourceContainerReuse() throws Exception { + Configuration tezConf = new Configuration(new YarnConfiguration()); + tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); + tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true); + tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); + tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); + tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); + + CapturingEventHandler eventHandler = new CapturingEventHandler(); + TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); + + AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); + TezAMRMClientAsync<CookieContainerRequest> rmClient = + spy(new AMRMClientAsyncForTest(rmClientCore, 100)); + + AppContext appContext = mock(AppContext.class); + doReturn(new Configuration(false)).when(appContext).getAMConf(); + AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), + mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); + AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); + doReturn(amContainerMap).when(appContext).getAllContainers(); + doReturn(amNodeTracker).when(appContext).getNodeTracker(); + doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); + doReturn(dagID).when(appContext).getCurrentDAGID(); + doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); + + TaskSchedulerManager taskSchedulerManagerReal = new TaskSchedulerManagerForTest(appContext, + eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), + TezUtils.createUserPayloadFromConf(tezConf)); + TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal); + taskSchedulerManager.init(tezConf); + taskSchedulerManager.start(); + + TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) + ((TaskSchedulerManagerForTest) taskSchedulerManager).getSpyTaskScheduler(); + TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); + AtomicBoolean drainNotifier = new AtomicBoolean(false); + taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; + + Resource resource1 = Resource.newInstance(1024, 1); + Resource resource2 = Resource.newInstance(2048, 1); + String[] host1 = {"host1"}; + String[] host2 = {"host2"}; + + String []racks = {"/default-rack"}; + Priority priority1 = Priority.newInstance(1); + + TezVertexID vertexID1 = TezVertexID.getInstance(dagID, 1); + + //Vertex 1, Task 1, Attempt 1, host1 + TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 1), 1); + TaskAttempt ta11 = mock(TaskAttempt.class); + AMSchedulerEventTALaunchRequest lrEvent1 = createLaunchRequestEvent( + taID11, ta11, resource1, host1, racks, priority1); + + //Vertex 1, Task 2, Attempt 1, host1 + TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 2), 1); + TaskAttempt ta12 = mock(TaskAttempt.class); + AMSchedulerEventTALaunchRequest lrEvent2 = createLaunchRequestEvent( + taID12, ta12, resource1, host1, racks, priority1); + + //Vertex 1, Task 3, Attempt 1, host2 + TezTaskAttemptID taID13 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 3), 1); + TaskAttempt ta13 = mock(TaskAttempt.class); + AMSchedulerEventTALaunchRequest lrEvent3 = createLaunchRequestEvent( + taID13, ta13, resource2, host2, racks, priority1); + + //Vertex 1, Task 4, Attempt 1, host2 + TezTaskAttemptID taID14 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 4), 1); + TaskAttempt ta14 = mock(TaskAttempt.class); + AMSchedulerEventTALaunchRequest lrEvent4 = createLaunchRequestEvent( + taID14, ta14, resource2, host2, racks, priority1); + + taskSchedulerManager.handleEvent(lrEvent1); + taskSchedulerManager.handleEvent(lrEvent2); + taskSchedulerManager.handleEvent(lrEvent3); + taskSchedulerManager.handleEvent(lrEvent4); + + Container container1 = createContainer(1, "host1", resource1, priority1); + Container container2 = createContainer(2, "host2", resource2, priority1); + + // One container allocated, should start ta11 + taskScheduler.onContainersAllocated(Collections.singletonList(container1)); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + drainableAppCallback.drain(); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1)); + + // Second container allocated, should start ta13 + taskScheduler.onContainersAllocated(Collections.singletonList(container2)); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + drainableAppCallback.drain(); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container2)); + + // ta11 finished, should start ta12 + taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), + TaskAttemptState.SUCCEEDED, null, null, 0)); + drainableAppCallback.drain(); + verifyDeAllocateTask(taskScheduler, ta11, true, null, null); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1)); + verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); + eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); + eventHandler.reset(); + + // ta13 finished, should start ta14 + taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), + TaskAttemptState.SUCCEEDED, null, null, 0)); + drainableAppCallback.drain(); + verifyDeAllocateTask(taskScheduler, ta13, true, null, null); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container2)); + verify(rmClient, times(0)).releaseAssignedContainer(eq(container2.getId())); + eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); + eventHandler.reset(); + + // ta12 finished no pending requests, should release container1 + taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), + TaskAttemptState.SUCCEEDED, null, null, 0)); + drainableAppCallback.drain(); + verifyDeAllocateTask(taskScheduler, ta12, true, null, null); + verify(rmClient).releaseAssignedContainer(eq(container1.getId())); + eventHandler.verifyInvocation(AMContainerEventStopRequest.class); + eventHandler.reset(); + + // ta14 finished no pending requests, should release container2. + taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), + TaskAttemptState.SUCCEEDED, null, null, 0)); + drainableAppCallback.drain(); + verifyDeAllocateTask(taskScheduler, ta14, true, null, null); + verify(rmClient).releaseAssignedContainer(eq(container2.getId())); + eventHandler.verifyInvocation(AMContainerEventStopRequest.class); + eventHandler.reset(); + + taskScheduler.shutdown(); + taskSchedulerManager.close(); + } + + @Test(timeout=5000) + public void testEnvironmentVarsContainerReuse() throws Exception { + Configuration tezConf = new Configuration(new YarnConfiguration()); + tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); + tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true); + tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); + tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); + tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); + + CapturingEventHandler eventHandler = new CapturingEventHandler(); + TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); + + AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); + TezAMRMClientAsync<CookieContainerRequest> rmClient = + spy(new AMRMClientAsyncForTest(rmClientCore, 100)); + + AppContext appContext = mock(AppContext.class); + doReturn(new Configuration(false)).when(appContext).getAMConf(); + AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), + mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); + AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); + doReturn(amContainerMap).when(appContext).getAllContainers(); + doReturn(amNodeTracker).when(appContext).getNodeTracker(); + doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); + doReturn(dagID).when(appContext).getCurrentDAGID(); + doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); + + TaskSchedulerManager taskSchedulerManagerReal = new TaskSchedulerManagerForTest(appContext, + eventHandler, rmClient, new ContainerContextMatcher(), + TezUtils.createUserPayloadFromConf(tezConf)); + TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal); + taskSchedulerManager.init(tezConf); + taskSchedulerManager.start(); + + TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) + ((TaskSchedulerManagerForTest) taskSchedulerManager).getSpyTaskScheduler(); + TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); + AtomicBoolean drainNotifier = new AtomicBoolean(false); + taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; + + Resource resource1 = Resource.newInstance(1024, 1); + String[] host1 = {"host1"}; + + String []racks = {"/default-rack"}; + Priority priority1 = Priority.newInstance(1); + + TezVertexID vertexID1 = TezVertexID.getInstance(dagID, 1); + + // Create containers with same resources and then see how tasks are allocated + // when the environments vary + Map<String, String> env1 = ImmutableMap.of("env1", "val1", "env2", "val2"); + Map<String, String> env2 = ImmutableMap.of("env3", "val3", "env4", "val4"); + Map<String, String> env3 = ImmutableMap.of("env1", "val1"); + Map<String, String> env4 = ImmutableMap.of("env1", "val1", "env4", "val4"); + + //Vertex 1, Task 1, Attempt 1, host1, r1=lr1 + TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 1), 1); + TaskAttempt ta11 = mock(TaskAttempt.class); + AMSchedulerEventTALaunchRequest lrEvent1 = createLaunchRequestEvent( + taID11, ta11, resource1, host1, racks, priority1, "", env1); + + //Vertex 1, Task 2, Attempt 1, host1, r1=lr2 + TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 2), 1); + TaskAttempt ta12 = mock(TaskAttempt.class); + AMSchedulerEventTALaunchRequest lrEvent2 = createLaunchRequestEvent( + taID12, ta12, resource1, host1, racks, priority1, "", env2); + + //Vertex 1, Task 3, Attempt 1, host1, r1=lr1 & r2=lr2 + TezTaskAttemptID taID13 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 3), 1); + TaskAttempt ta13 = mock(TaskAttempt.class); + AMSchedulerEventTALaunchRequest lrEvent3 = createLaunchRequestEvent( + taID13, ta13, resource1, host1, racks, priority1, "", env3); + + //Vertex 1, Task 4, Attempt 1, host1, r2=lr2 + TezTaskAttemptID taID14 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 4), 1); + TaskAttempt ta14 = mock(TaskAttempt.class); + AMSchedulerEventTALaunchRequest lrEvent4 = createLaunchRequestEvent( + taID14, ta14, resource1, host1, racks, priority1, "", env4); + + taskSchedulerManager.handleEvent(lrEvent1); + taskSchedulerManager.handleEvent(lrEvent2); + taskSchedulerManager.handleEvent(lrEvent3); + taskSchedulerManager.handleEvent(lrEvent4); + + Container container1 = createContainer(1, "host1", resource1, priority1); + Container container2 = createContainer(2, "host1", resource1, priority1); + Container container3 = createContainer(3, "host1", resource1, priority1); + + // One container allocated, should start ta11 on container1 + taskScheduler.onContainersAllocated(Collections.singletonList(container1)); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + drainableAppCallback.drain(); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1)); + + // finish ta11, should start ta13 + taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), + TaskAttemptState.SUCCEEDED, null, null, 0)); + drainableAppCallback.drain(); + verifyDeAllocateTask(taskScheduler, ta11, true, null, null); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container1)); + verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); + eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); + eventHandler.reset(); + + // ta13 finished, cannot reuse container1, should release container1 + taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), + TaskAttemptState.SUCCEEDED, null, null, 0)); + drainableAppCallback.drain(); + verifyDeAllocateTask(taskScheduler, ta13, true, null, null); + verify(rmClient).releaseAssignedContainer(eq(container1.getId())); + eventHandler.verifyInvocation(AMContainerEventStopRequest.class); + eventHandler.reset(); + + // Second container allocated, should start ta12. + taskScheduler.onContainersAllocated(Collections.singletonList(container2)); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + drainableAppCallback.drain(); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container2)); + + // ta12 finished, cannot reuse container, should release container2 + taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta12, container2.getId(), + TaskAttemptState.SUCCEEDED, null, null, 0)); + drainableAppCallback.drain(); + verifyDeAllocateTask(taskScheduler, ta12, true, null, null); + verify(rmClient).releaseAssignedContainer(eq(container2.getId())); + eventHandler.verifyInvocation(AMContainerEventStopRequest.class); + eventHandler.reset(); + + // Third container allocated, should start ta14. + taskScheduler.onContainersAllocated(Collections.singletonList(container3)); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + drainableAppCallback.drain(); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container3)); + + // ta14 finished, should release container3 + taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta14, container3.getId(), + TaskAttemptState.SUCCEEDED, null, null, 0)); + drainableAppCallback.drain(); + verifyDeAllocateTask(taskScheduler, ta14, true, null, null); + verify(rmClient).releaseAssignedContainer(eq(container3.getId())); + eventHandler.verifyInvocation(AMContainerEventStopRequest.class); + eventHandler.reset(); + + taskScheduler.shutdown(); + taskSchedulerManager.close(); + } + private Container createContainer(int id, String host, Resource resource, Priority priority) { @SuppressWarnings("deprecation") ContainerId containerID = ContainerId.newInstance( @@ -1400,6 +1713,14 @@ public class TestContainerReuse { new ContainerContext(localResources, new Credentials(), new HashMap<String, String>(), jvmOpts)); } + private AMSchedulerEventTALaunchRequest createLaunchRequestEvent(TezTaskAttemptID taID, + TaskAttempt ta, Resource capability, String[] hosts, String[] racks, Priority priority, + String jvmOpts, Map<String, String> environment) { + return createLaunchRequestEvent(taID, ta, capability, hosts, racks, priority, + new ContainerContext(new HashMap<String, LocalResource>(), new Credentials(), environment, + jvmOpts)); + } + private static class ChangingDAGIDAnswer implements Answer<TezDAGID> { private TezDAGID dagID;
