http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index 0a02f9e..0e90681 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -68,7 +68,7 @@ import org.apache.tez.dag.app.ClusterInfo; import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.DAGAppMasterState; -import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest; import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientAsyncForTest; @@ -76,7 +76,7 @@ import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientForTest; import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher; import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.CapturingEventHandler; import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDrainable; -import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest; +import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerManagerForTest; import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableContext; import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA; import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest; @@ -136,7 +136,7 @@ public class TestContainerReuse { doReturn(conf).when(appContext).getAMConf(); AMContainerMap amContainerMap = new AMContainerMap( mock(ContainerHeartbeatHandler.class), - mock(TaskAttemptListener.class), + mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); doReturn(amContainerMap).when(appContext).getAllContainers(); @@ -145,18 +145,18 @@ public class TestContainerReuse { doReturn(dagID).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); - TaskSchedulerEventHandler taskSchedulerEventHandlerReal = - new TaskSchedulerEventHandlerForTest( + TaskSchedulerManager taskSchedulerManagerReal = + new TaskSchedulerManagerForTest( appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(conf)); - TaskSchedulerEventHandler taskSchedulerEventHandler = - spy(taskSchedulerEventHandlerReal); - taskSchedulerEventHandler.init(conf); - taskSchedulerEventHandler.start(); + TaskSchedulerManager taskSchedulerManager = + spy(taskSchedulerManagerReal); + taskSchedulerManager.init(conf); + taskSchedulerManager.start(); TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) - ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) + ((TaskSchedulerManagerForTest) taskSchedulerManager) .getSpyTaskScheduler(); TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); @@ -192,10 +192,10 @@ public class TestContainerReuse { defaultRack, priority); drainNotifier.set(false); - taskSchedulerEventHandler.handleEvent(lrTa11); + taskSchedulerManager.handleEvent(lrTa11); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainNotifier.set(false); - taskSchedulerEventHandler.handleEvent(lrTa21); + taskSchedulerManager.handleEvent(lrTa21); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container containerHost1 = createContainer(1, host1[0], resource, priority); @@ -206,28 +206,28 @@ public class TestContainerReuse { Lists.newArrayList(containerHost1, containerHost2)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated( + verify(taskSchedulerManager).taskAllocated( eq(0), eq(ta11), any(Object.class), eq(containerHost1)); - verify(taskSchedulerEventHandler).taskAllocated( + verify(taskSchedulerManager).taskAllocated( eq(0), eq(ta21), any(Object.class), eq(containerHost2)); // Adding the event later so that task1 assigned to containerHost1 // is deterministic. - taskSchedulerEventHandler.handleEvent(lrTa31); + taskSchedulerManager.handleEvent(lrTa31); - taskSchedulerEventHandler.handleEvent( + taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded( ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta11, true, null, null); - verify(taskSchedulerEventHandler, times(1)).taskAllocated( + verify(taskSchedulerManager, times(1)).taskAllocated( eq(0), eq(ta31), any(Object.class), eq(containerHost1)); verify(rmClient, times(0)).releaseAssignedContainer( eq(containerHost1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); - taskSchedulerEventHandler.handleEvent( + taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded(ta21, containerHost2.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); @@ -235,7 +235,7 @@ public class TestContainerReuse { Throwable exception = null; while (System.currentTimeMillis() < currentTs + 5000l) { try { - verify(taskSchedulerEventHandler, + verify(taskSchedulerManager, times(1)).containerBeingReleased(eq(0), eq(containerHost2.getId())); exception = null; break; @@ -245,7 +245,7 @@ public class TestContainerReuse { } assertTrue("containerHost2 was not released", exception == null); taskScheduler.shutdown(); - taskSchedulerEventHandler.close(); + taskSchedulerManager.close(); } @Test(timeout = 15000l) @@ -272,7 +272,7 @@ public class TestContainerReuse { doReturn(new Configuration(false)).when(appContext).getAMConf(); AMContainerMap amContainerMap = new AMContainerMap( mock(ContainerHeartbeatHandler.class), - mock(TaskAttemptListener.class), + mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); doReturn(amContainerMap).when(appContext).getAllContainers(); @@ -281,17 +281,17 @@ public class TestContainerReuse { doReturn(dagID).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); - TaskSchedulerEventHandler taskSchedulerEventHandlerReal = - new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, + TaskSchedulerManager taskSchedulerManagerReal = + new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(conf)); - TaskSchedulerEventHandler taskSchedulerEventHandler = - spy(taskSchedulerEventHandlerReal); - taskSchedulerEventHandler.init(conf); - taskSchedulerEventHandler.start(); + TaskSchedulerManager taskSchedulerManager = + spy(taskSchedulerManagerReal); + taskSchedulerManager.init(conf); + taskSchedulerManager.start(); TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) - ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) + ((TaskSchedulerManagerForTest) taskSchedulerManager) .getSpyTaskScheduler(); TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); @@ -321,10 +321,10 @@ public class TestContainerReuse { taID31, ta31, resource, host1, defaultRack, priority); drainNotifier.set(false); - taskSchedulerEventHandler.handleEvent(lrTa11); + taskSchedulerManager.handleEvent(lrTa11); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainNotifier.set(false); - taskSchedulerEventHandler.handleEvent(lrTa21); + taskSchedulerManager.handleEvent(lrTa21); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container containerHost1 = createContainer(1, host1[0], resource, priority); @@ -334,26 +334,26 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Lists.newArrayList(containerHost1, containerHost2)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(containerHost1)); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta21), any(Object.class), + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(containerHost1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta21), any(Object.class), eq(containerHost2)); // Adding the event later so that task1 assigned to containerHost1 is deterministic. - taskSchedulerEventHandler.handleEvent(lrTa31); + taskSchedulerManager.handleEvent(lrTa31); - taskSchedulerEventHandler.handleEvent( + taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded(ta21, containerHost2.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta21, true, null, null); - verify(taskSchedulerEventHandler, times(0)).taskAllocated( + verify(taskSchedulerManager, times(0)).taskAllocated( eq(0), eq(ta31), any(Object.class), eq(containerHost2)); verify(rmClient, times(1)).releaseAssignedContainer( eq(containerHost2.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); taskScheduler.shutdown(); - taskSchedulerEventHandler.close(); + taskSchedulerManager.close(); } @Test(timeout = 10000l) @@ -375,7 +375,7 @@ public class TestContainerReuse { AppContext appContext = mock(AppContext.class); doReturn(new Configuration(false)).when(appContext).getAMConf(); AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), - mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext); + mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); doReturn(amContainerMap).when(appContext).getAllContainers(); doReturn(amNodeTracker).when(appContext).getNodeTracker(); @@ -383,12 +383,13 @@ public class TestContainerReuse { doReturn(dagID).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); - TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); - TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal); - taskSchedulerEventHandler.init(tezConf); - taskSchedulerEventHandler.start(); + TaskSchedulerManager + taskSchedulerManagerReal = new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); + TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal); + taskSchedulerManager.init(tezConf); + taskSchedulerManager.start(); - TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) + TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager) .getSpyTaskScheduler(); TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); AtomicBoolean drainNotifier = new AtomicBoolean(false); @@ -423,10 +424,10 @@ public class TestContainerReuse { TaskAttempt ta14 = mock(TaskAttempt.class); AMSchedulerEventTALaunchRequest lrEvent4 = createLaunchRequestEvent(taID14, ta14, resource1, host2, racks, priority1); - taskSchedulerEventHandler.handleEvent(lrEvent1); - taskSchedulerEventHandler.handleEvent(lrEvent2); - taskSchedulerEventHandler.handleEvent(lrEvent3); - taskSchedulerEventHandler.handleEvent(lrEvent4); + taskSchedulerManager.handleEvent(lrEvent1); + taskSchedulerManager.handleEvent(lrEvent2); + taskSchedulerManager.handleEvent(lrEvent3); + taskSchedulerManager.handleEvent(lrEvent4); Container container1 = createContainer(1, "host1", resource1, priority1); @@ -435,39 +436,39 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1)); // Task assigned to container completed successfully. Container should be re-used. - taskSchedulerEventHandler.handleEvent( + taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta11, true, null, null); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); // Task assigned to container completed successfully. // Verify reuse across hosts. - taskSchedulerEventHandler.handleEvent( + taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta12, true, null, null); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta13), any(Object.class), + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); // Verify no re-use if a previous task fails. - taskSchedulerEventHandler.handleEvent( + taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, null, "TIMEOUT", 0)); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class), + verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container1)); verifyDeAllocateTask(taskScheduler, ta13, false, null, "TIMEOUT"); verify(rmClient).releaseAssignedContainer(eq(container1.getId())); @@ -481,11 +482,11 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Collections.singletonList(container2)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta14), any(Object.class), + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container2)); // Task assigned to container completed successfully. No pending requests. Container should be released. - taskSchedulerEventHandler.handleEvent( + taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); @@ -495,7 +496,7 @@ public class TestContainerReuse { eventHandler.reset(); taskScheduler.shutdown(); - taskSchedulerEventHandler.close(); + taskSchedulerManager.close(); } @Test(timeout = 10000l) @@ -521,7 +522,7 @@ public class TestContainerReuse { AppContext appContext = mock(AppContext.class); doReturn(new Configuration(false)).when(appContext).getAMConf(); AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), - mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext); + mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); doReturn(amContainerMap).when(appContext).getAllContainers(); doReturn(amNodeTracker).when(appContext).getNodeTracker(); @@ -530,14 +531,14 @@ public class TestContainerReuse { doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); //Use ContainerContextMatcher here. Otherwise it would not match the JVM options - TaskSchedulerEventHandler taskSchedulerEventHandlerReal = - new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new ContainerContextMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); - TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal); - taskSchedulerEventHandler.init(tezConf); - taskSchedulerEventHandler.start(); + TaskSchedulerManager taskSchedulerManagerReal = + new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, new ContainerContextMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); + TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal); + taskSchedulerManager.init(tezConf); + taskSchedulerManager.start(); TaskSchedulerWithDrainableContext taskScheduler = - (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) + (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager) .getSpyTaskScheduler(); TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); AtomicBoolean drainNotifier = new AtomicBoolean(false); @@ -573,10 +574,10 @@ public class TestContainerReuse { createLaunchRequestEvent(taID12, ta12, resource1, host1, racks, priority1); drainNotifier.set(false); - taskSchedulerEventHandler.handleEvent(lrEvent1); + taskSchedulerManager.handleEvent(lrEvent1); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainNotifier.set(false); - taskSchedulerEventHandler.handleEvent(lrEvent2); + taskSchedulerManager.handleEvent(lrEvent2); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container container1 = createContainer(1, "host1", resource1, priority1); @@ -586,16 +587,16 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1)); // First task had profiling on. This container can not be reused further. - taskSchedulerEventHandler.handleEvent( + taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta11, true, null, null); - verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta12), any(Object.class), + verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1)); verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); @@ -623,10 +624,10 @@ public class TestContainerReuse { Container container2 = createContainer(2, "host2", resource1, priority1); drainNotifier.set(false); - taskSchedulerEventHandler.handleEvent(lrEvent3); + taskSchedulerManager.handleEvent(lrEvent3); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainNotifier.set(false); - taskSchedulerEventHandler.handleEvent(lrEvent4); + taskSchedulerManager.handleEvent(lrEvent4); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); // Container started @@ -634,16 +635,16 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Collections.singletonList(container2)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container2)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container2)); // Verify that the container can not be reused when profiling option is turned on // Even for 2 tasks having same profiling option can have container reusability. - taskSchedulerEventHandler.handleEvent( + taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta13, true, null, null); - verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class), + verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container2)); verify(rmClient, times(1)).releaseAssignedContainer(eq(container2.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); @@ -675,27 +676,27 @@ public class TestContainerReuse { // Container started Container container3 = createContainer(2, "host3", resource1, priority1); - taskSchedulerEventHandler.handleEvent(lrEvent5); - taskSchedulerEventHandler.handleEvent(lrEvent6); + taskSchedulerManager.handleEvent(lrEvent5); + taskSchedulerManager.handleEvent(lrEvent6); drainNotifier.set(false); taskScheduler.onContainersAllocated(Collections.singletonList(container3)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta15), any(Object.class), + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta15), any(Object.class), eq(container3)); //Ensure task 6 (of vertex 1) is allocated to same container - taskSchedulerEventHandler.handleEvent( + taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta15, true, null, null); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta16), any(Object.class), eq(container3)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta16), any(Object.class), eq(container3)); eventHandler.reset(); taskScheduler.shutdown(); - taskSchedulerEventHandler.close(); + taskSchedulerManager.close(); } @Test(timeout = 30000l) @@ -721,7 +722,7 @@ public class TestContainerReuse { doReturn(new Configuration(false)).when(appContext).getAMConf(); AMContainerMap amContainerMap = new AMContainerMap( mock(ContainerHeartbeatHandler.class), - mock(TaskAttemptListener.class), + mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); doReturn(amContainerMap).when(appContext).getAllContainers(); @@ -730,18 +731,18 @@ public class TestContainerReuse { doReturn(dagID).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); - TaskSchedulerEventHandler taskSchedulerEventHandlerReal = - new TaskSchedulerEventHandlerForTest( + TaskSchedulerManager taskSchedulerManagerReal = + new TaskSchedulerManagerForTest( appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); - TaskSchedulerEventHandler taskSchedulerEventHandler = - spy(taskSchedulerEventHandlerReal); - taskSchedulerEventHandler.init(tezConf); - taskSchedulerEventHandler.start(); + TaskSchedulerManager taskSchedulerManager = + spy(taskSchedulerManagerReal); + taskSchedulerManager.init(tezConf); + taskSchedulerManager.start(); TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) - ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) + ((TaskSchedulerManagerForTest) taskSchedulerManager) .getSpyTaskScheduler(); TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); @@ -772,7 +773,7 @@ public class TestContainerReuse { // Send launch request for task 1 only, deterministic assignment to this task. drainNotifier.set(false); - taskSchedulerEventHandler.handleEvent(lrEvent11); + taskSchedulerManager.handleEvent(lrEvent11); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container container1 = createContainer(1, "randomHost", resource1, priority); @@ -782,21 +783,21 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated( + verify(taskSchedulerManager).taskAllocated( eq(0), eq(ta11), any(Object.class), eq(container1)); // Send launch request for task2 (vertex2) - taskSchedulerEventHandler.handleEvent(lrEvent12); + taskSchedulerManager.handleEvent(lrEvent12); // Task assigned to container completed successfully. // Container should not be immediately assigned to task 2 // until delay expires. - taskSchedulerEventHandler.handleEvent( + taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta11, true, null, null); - verify(taskSchedulerEventHandler, times(0)).taskAllocated( + verify(taskSchedulerManager, times(0)).taskAllocated( eq(0), eq(ta12), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); @@ -804,11 +805,11 @@ public class TestContainerReuse { LOG.info("Sleeping to ensure that the scheduling loop runs"); Thread.sleep(3000l); - verify(taskSchedulerEventHandler).taskAllocated( + verify(taskSchedulerManager).taskAllocated( eq(0), eq(ta12), any(Object.class), eq(container1)); // TA12 completed. - taskSchedulerEventHandler.handleEvent( + taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); @@ -818,7 +819,7 @@ public class TestContainerReuse { eventHandler.verifyInvocation(AMContainerEventStopRequest.class); taskScheduler.shutdown(); - taskSchedulerEventHandler.close(); + taskSchedulerManager.close(); } @Test(timeout = 30000l) @@ -845,7 +846,7 @@ public class TestContainerReuse { doReturn(new Configuration(false)).when(appContext).getAMConf(); AMContainerMap amContainerMap = new AMContainerMap( mock(ContainerHeartbeatHandler.class), - mock(TaskAttemptListener.class), + mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); doReturn(amContainerMap).when(appContext).getAllContainers(); @@ -855,17 +856,17 @@ public class TestContainerReuse { doReturn(dagID).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); - TaskSchedulerEventHandler taskSchedulerEventHandlerReal = - new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, + TaskSchedulerManager taskSchedulerManagerReal = + new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); - TaskSchedulerEventHandler taskSchedulerEventHandler = - spy(taskSchedulerEventHandlerReal); - taskSchedulerEventHandler.init(tezConf); - taskSchedulerEventHandler.start(); + TaskSchedulerManager taskSchedulerManager = + spy(taskSchedulerManagerReal); + taskSchedulerManager.init(tezConf); + taskSchedulerManager.start(); TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) - ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) + ((TaskSchedulerManagerForTest) taskSchedulerManager) .getSpyTaskScheduler(); TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); @@ -900,7 +901,7 @@ public class TestContainerReuse { // Send launch request for task 1 onle, deterministic assignment to this task. drainNotifier.set(false); - taskSchedulerEventHandler.handleEvent(lrEvent11); + taskSchedulerManager.handleEvent(lrEvent11); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container container1 = createContainer(1, host1[0], resource1, priority1); @@ -910,25 +911,25 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated( + verify(taskSchedulerManager).taskAllocated( eq(0), eq(ta11), any(Object.class), eq(container1)); // Send launch request for task2 (vertex2) - taskSchedulerEventHandler.handleEvent(lrEvent21); + taskSchedulerManager.handleEvent(lrEvent21); // Task assigned to container completed successfully. // Container should be assigned to task21. - taskSchedulerEventHandler.handleEvent( + taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta11, true, null, null); - verify(taskSchedulerEventHandler).taskAllocated( + verify(taskSchedulerManager).taskAllocated( eq(0), eq(ta21), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); // Task 2 completes. - taskSchedulerEventHandler.handleEvent( + taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded(ta21, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); @@ -939,7 +940,7 @@ public class TestContainerReuse { verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); taskScheduler.shutdown(); - taskSchedulerEventHandler.close(); + taskSchedulerManager.close(); } @Test(timeout = 30000l) @@ -962,7 +963,7 @@ public class TestContainerReuse { doReturn(new Configuration(false)).when(appContext).getAMConf(); ChangingDAGIDAnswer dagIDAnswer = new ChangingDAGIDAnswer(dagID1); AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), - mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext); + mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); doReturn(amContainerMap).when(appContext).getAllContainers(); doReturn(amNodeTracker).when(appContext).getNodeTracker(); @@ -971,12 +972,13 @@ public class TestContainerReuse { doAnswer(dagIDAnswer).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); - TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); - TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal); - taskSchedulerEventHandler.init(tezConf); - taskSchedulerEventHandler.start(); + TaskSchedulerManager + taskSchedulerManagerReal = new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); + TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal); + taskSchedulerManager.init(tezConf); + taskSchedulerManager.start(); - TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) + TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager) .getSpyTaskScheduler(); TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); AtomicBoolean drainNotifier = new AtomicBoolean(false); @@ -1013,10 +1015,10 @@ public class TestContainerReuse { AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID112, ta112, resource1, host1, racks, priority1, dag1LRs); drainNotifier.set(false); - taskSchedulerEventHandler.handleEvent(lrEvent11); + taskSchedulerManager.handleEvent(lrEvent11); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainNotifier.set(false); - taskSchedulerEventHandler.handleEvent(lrEvent12); + taskSchedulerManager.handleEvent(lrEvent12); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container container1 = createContainer(1, "host1", resource1, priority1); @@ -1026,17 +1028,17 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1)); assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); assertEquals(1, assignEvent.getRemoteTaskLocalResources().size()); // Task assigned to container completed successfully. Container should be re-used. - taskSchedulerEventHandler.handleEvent( + taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta111, true, null, null); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); @@ -1045,7 +1047,7 @@ public class TestContainerReuse { // Task assigned to container completed successfully. // Verify reuse across hosts. - taskSchedulerEventHandler.handleEvent( + taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); @@ -1074,14 +1076,14 @@ public class TestContainerReuse { TaskAttempt ta212 = mock(TaskAttempt.class); AMSchedulerEventTALaunchRequest lrEvent22 = createLaunchRequestEvent(taID212, ta212, resource1, host1, racks, priority1, dag2LRs); - taskSchedulerEventHandler.handleEvent(lrEvent21); - taskSchedulerEventHandler.handleEvent(lrEvent22); + taskSchedulerManager.handleEvent(lrEvent21); + taskSchedulerManager.handleEvent(lrEvent22); drainableAppCallback.drain(); // TODO This is terrible, need a better way to ensure the scheduling loop has run LOG.info("Sleeping to ensure that the scheduling loop runs"); Thread.sleep(6000l); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class), + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); @@ -1089,12 +1091,12 @@ public class TestContainerReuse { assertEquals(2, assignEvent.getRemoteTaskLocalResources().size()); eventHandler.reset(); - taskSchedulerEventHandler.handleEvent( + taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta211, true, null, null); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta212), any(Object.class), eq(container1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta212), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); @@ -1102,7 +1104,7 @@ public class TestContainerReuse { eventHandler.reset(); taskScheduler.shutdown(); - taskSchedulerEventHandler.close(); + taskSchedulerManager.close(); } @Test(timeout = 30000l) @@ -1125,7 +1127,7 @@ public class TestContainerReuse { doReturn(new Configuration(false)).when(appContext).getAMConf(); ChangingDAGIDAnswer dagIDAnswer = new ChangingDAGIDAnswer(dagID1); AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), - mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext); + mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); doReturn(amContainerMap).when(appContext).getAllContainers(); doReturn(amNodeTracker).when(appContext).getNodeTracker(); @@ -1134,14 +1136,14 @@ public class TestContainerReuse { doAnswer(dagIDAnswer).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); - TaskSchedulerEventHandler taskSchedulerEventHandlerReal = - new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, + TaskSchedulerManager taskSchedulerManagerReal = + new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, new ContainerContextMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); - TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal); - taskSchedulerEventHandler.init(tezConf); - taskSchedulerEventHandler.start(); + TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal); + taskSchedulerManager.init(tezConf); + taskSchedulerManager.start(); - TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) + TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager) .getSpyTaskScheduler(); TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); AtomicBoolean drainNotifier = new AtomicBoolean(false); @@ -1185,10 +1187,10 @@ public class TestContainerReuse { AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID112, ta112, resource1, host1, racks, priority1, v12LR); drainNotifier.set(false); - taskSchedulerEventHandler.handleEvent(lrEvent11); + taskSchedulerManager.handleEvent(lrEvent11); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainNotifier.set(false); - taskSchedulerEventHandler.handleEvent(lrEvent12); + taskSchedulerManager.handleEvent(lrEvent12); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container container1 = createContainer(1, "host1", resource1, priority1); @@ -1199,17 +1201,17 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1)); assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); assertEquals(1, assignEvent.getRemoteTaskLocalResources().size()); // Task assigned to container completed successfully. Container should be re-used. - taskSchedulerEventHandler.handleEvent( + taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta111, true, null, null); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); @@ -1218,7 +1220,7 @@ public class TestContainerReuse { // Task assigned to container completed successfully. // Verify reuse across hosts. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); + taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta112, true, null, null); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); @@ -1243,7 +1245,7 @@ public class TestContainerReuse { AMSchedulerEventTALaunchRequest lrEvent21 = createLaunchRequestEvent(taID211, ta211, resource1, host1, racks, priority1, v21LR); - taskSchedulerEventHandler.handleEvent(lrEvent21); + taskSchedulerManager.handleEvent(lrEvent21); drainableAppCallback.drain(); // TODO This is terrible, need a better way to ensure the scheduling loop has run @@ -1254,11 +1256,11 @@ public class TestContainerReuse { Thread.sleep(6000l); verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId())); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2)); eventHandler.reset(); taskScheduler.shutdown(); - taskSchedulerEventHandler.close(); + taskSchedulerManager.close(); } @Test(timeout = 10000l) @@ -1281,7 +1283,7 @@ public class TestContainerReuse { AppContext appContext = mock(AppContext.class); doReturn(new Configuration(false)).when(appContext).getAMConf(); AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), - mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext); + mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); doReturn(amContainerMap).when(appContext).getAllContainers(); doReturn(amNodeTracker).when(appContext).getNodeTracker(); @@ -1290,15 +1292,15 @@ public class TestContainerReuse { doReturn(dagID).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); - TaskSchedulerEventHandler taskSchedulerEventHandlerReal = - new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, + TaskSchedulerManager taskSchedulerManagerReal = + new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); - TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal); - taskSchedulerEventHandler.init(tezConf); - taskSchedulerEventHandler.start(); + TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal); + taskSchedulerManager.init(tezConf); + taskSchedulerManager.start(); TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) - ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) + ((TaskSchedulerManagerForTest) taskSchedulerManager) .getSpyTaskScheduler(); TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); AtomicBoolean drainNotifier = new AtomicBoolean(false); @@ -1318,7 +1320,7 @@ public class TestContainerReuse { TaskAttempt ta11 = mock(TaskAttempt.class); AMSchedulerEventTALaunchRequest lrEvent1 = createLaunchRequestEvent(taID11, ta11, resource1, host1, racks, priority1); - taskSchedulerEventHandler.handleEvent(lrEvent1); + taskSchedulerManager.handleEvent(lrEvent1); Container container1 = createContainer(1, "host1", resource1, priority1); @@ -1326,10 +1328,10 @@ public class TestContainerReuse { drainNotifier.set(false); taskScheduler.onContainersAllocated(Collections.singletonList(container1)); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta11), + verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1)); taskScheduler.shutdown(); - taskSchedulerEventHandler.close(); + taskSchedulerManager.close(); } private Container createContainer(int id, String host, Resource resource, Priority priority) {
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java deleted file mode 100644 index c85be6c..0000000 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java +++ /dev/null @@ -1,707 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.app.rm; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.event.Event; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.tez.common.ContainerSignatureMatcher; -import org.apache.tez.common.TezUtils; -import org.apache.tez.dag.api.NamedEntityDescriptor; -import org.apache.tez.dag.api.TaskLocationHint; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TezConstants; -import org.apache.tez.dag.api.UserPayload; -import org.apache.tez.dag.api.client.DAGClientServer; -import org.apache.tez.dag.app.AppContext; -import org.apache.tez.dag.app.ContainerContext; -import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; -import org.apache.tez.dag.app.dag.TaskAttempt; -import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl; -import org.apache.tez.dag.app.dag.impl.TaskImpl; -import org.apache.tez.dag.app.dag.impl.VertexImpl; -import org.apache.tez.dag.app.rm.container.AMContainer; -import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA; -import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted; -import org.apache.tez.dag.app.rm.container.AMContainerEventType; -import org.apache.tez.dag.app.rm.container.AMContainerMap; -import org.apache.tez.dag.app.rm.container.AMContainerState; -import org.apache.tez.dag.app.web.WebUIService; -import org.apache.tez.dag.records.TaskAttemptTerminationCause; -import org.apache.tez.dag.records.TezDAGID; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.dag.records.TezTaskID; -import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.runtime.api.impl.TaskSpec; -import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; -import org.apache.tez.serviceplugins.api.TaskScheduler; -import org.apache.tez.serviceplugins.api.TaskSchedulerContext; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import com.google.common.collect.Lists; - -@SuppressWarnings("rawtypes") -public class TestTaskSchedulerEventHandler { - - class TestEventHandler implements EventHandler{ - List<Event> events = Lists.newLinkedList(); - @Override - public void handle(Event event) { - events.add(event); - } - } - - class MockTaskSchedulerEventHandler extends TaskSchedulerEventHandler { - - final AtomicBoolean notify = new AtomicBoolean(false); - - public MockTaskSchedulerEventHandler(AppContext appContext, - DAGClientServer clientService, EventHandler eventHandler, - ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) { - super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, - Lists.newArrayList(new NamedEntityDescriptor("FakeDescriptor", null)), false); - } - - @Override - protected void instantiateSchedulers(String host, int port, String trackingUrl, - AppContext appContext) { - taskSchedulers[0] = mockTaskScheduler; - taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[0]); - } - - @Override - protected void notifyForTest() { - synchronized (notify) { - notify.set(true); - notify.notifyAll(); - } - } - - } - - AppContext mockAppContext; - DAGClientServer mockClientService; - TestEventHandler mockEventHandler; - ContainerSignatureMatcher mockSigMatcher; - MockTaskSchedulerEventHandler schedulerHandler; - TaskScheduler mockTaskScheduler; - AMContainerMap mockAMContainerMap; - WebUIService mockWebUIService; - - @Before - public void setup() { - mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS); - doReturn(new Configuration(false)).when(mockAppContext).getAMConf(); - mockClientService = mock(DAGClientServer.class); - mockEventHandler = new TestEventHandler(); - mockSigMatcher = mock(ContainerSignatureMatcher.class); - mockTaskScheduler = mock(TaskScheduler.class); - mockAMContainerMap = mock(AMContainerMap.class); - mockWebUIService = mock(WebUIService.class); - when(mockAppContext.getAllContainers()).thenReturn(mockAMContainerMap); - when(mockClientService.getBindAddress()).thenReturn(new InetSocketAddress(10000)); - schedulerHandler = new MockTaskSchedulerEventHandler( - mockAppContext, mockClientService, mockEventHandler, mockSigMatcher, mockWebUIService); - } - - @Test(timeout = 5000) - public void testSimpleAllocate() throws Exception { - Configuration conf = new Configuration(false); - schedulerHandler.init(conf); - schedulerHandler.start(); - - TaskAttemptImpl mockTaskAttempt = mock(TaskAttemptImpl.class); - TezTaskAttemptID mockAttemptId = mock(TezTaskAttemptID.class); - when(mockAttemptId.getId()).thenReturn(0); - when(mockTaskAttempt.getID()).thenReturn(mockAttemptId); - Resource resource = Resource.newInstance(1024, 1); - ContainerContext containerContext = - new ContainerContext(new HashMap<String, LocalResource>(), new Credentials(), - new HashMap<String, String>(), ""); - int priority = 10; - TaskLocationHint locHint = TaskLocationHint.createTaskLocationHint(new HashSet<String>(), null); - - ContainerId mockCId = mock(ContainerId.class); - Container container = mock(Container.class); - when(container.getId()).thenReturn(mockCId); - - AMContainer mockAMContainer = mock(AMContainer.class); - when(mockAMContainer.getContainerId()).thenReturn(mockCId); - when(mockAMContainer.getState()).thenReturn(AMContainerState.IDLE); - - when(mockAMContainerMap.get(mockCId)).thenReturn(mockAMContainer); - - AMSchedulerEventTALaunchRequest lr = - new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt, locHint, - priority, containerContext, 0, 0, 0); - schedulerHandler.taskAllocated(0, mockTaskAttempt, lr, container); - assertEquals(2, mockEventHandler.events.size()); - assertTrue(mockEventHandler.events.get(1) instanceof AMContainerEventAssignTA); - AMContainerEventAssignTA assignEvent = - (AMContainerEventAssignTA) mockEventHandler.events.get(1); - assertEquals(priority, assignEvent.getPriority()); - assertEquals(mockAttemptId, assignEvent.getTaskAttemptId()); - } - - @Test (timeout = 5000) - public void testTaskBasedAffinity() throws Exception { - Configuration conf = new Configuration(false); - schedulerHandler.init(conf); - schedulerHandler.start(); - - TaskAttemptImpl mockTaskAttempt = mock(TaskAttemptImpl.class); - TezTaskAttemptID taId = mock(TezTaskAttemptID.class); - String affVertexName = "srcVertex"; - int affTaskIndex = 1; - TaskLocationHint locHint = TaskLocationHint.createTaskLocationHint(affVertexName, affTaskIndex); - VertexImpl affVertex = mock(VertexImpl.class); - TaskImpl affTask = mock(TaskImpl.class); - TaskAttemptImpl affAttempt = mock(TaskAttemptImpl.class); - ContainerId affCId = mock(ContainerId.class); - when(affVertex.getTotalTasks()).thenReturn(2); - when(affVertex.getTask(affTaskIndex)).thenReturn(affTask); - when(affTask.getSuccessfulAttempt()).thenReturn(affAttempt); - when(affAttempt.getAssignedContainerID()).thenReturn(affCId); - when(mockAppContext.getCurrentDAG().getVertex(affVertexName)).thenReturn(affVertex); - Resource resource = Resource.newInstance(100, 1); - AMSchedulerEventTALaunchRequest event = new AMSchedulerEventTALaunchRequest - (taId, resource, null, mockTaskAttempt, locHint, 3, null, 0, 0, 0); - schedulerHandler.notify.set(false); - schedulerHandler.handle(event); - synchronized (schedulerHandler.notify) { - while (!schedulerHandler.notify.get()) { - schedulerHandler.notify.wait(); - } - } - - // verify mockTaskAttempt affinitized to expected affCId - verify(mockTaskScheduler, times(1)).allocateTask(mockTaskAttempt, resource, affCId, - Priority.newInstance(3), null, event); - - schedulerHandler.stop(); - schedulerHandler.close(); - } - - @Test (timeout = 5000) - public void testContainerPreempted() throws IOException { - Configuration conf = new Configuration(false); - schedulerHandler.init(conf); - schedulerHandler.start(); - - String diagnostics = "Container preempted by RM."; - TaskAttemptImpl mockTask = mock(TaskAttemptImpl.class); - ContainerStatus mockStatus = mock(ContainerStatus.class); - ContainerId mockCId = mock(ContainerId.class); - AMContainer mockAMContainer = mock(AMContainer.class); - when(mockAMContainerMap.get(mockCId)).thenReturn(mockAMContainer); - when(mockAMContainer.getContainerId()).thenReturn(mockCId); - when(mockStatus.getContainerId()).thenReturn(mockCId); - when(mockStatus.getDiagnostics()).thenReturn(diagnostics); - when(mockStatus.getExitStatus()).thenReturn(ContainerExitStatus.PREEMPTED); - schedulerHandler.containerCompleted(0, mockTask, mockStatus); - assertEquals(1, mockEventHandler.events.size()); - Event event = mockEventHandler.events.get(0); - assertEquals(AMContainerEventType.C_COMPLETED, event.getType()); - AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event; - assertEquals(mockCId, completedEvent.getContainerId()); - assertEquals("Container preempted externally. Container preempted by RM.", - completedEvent.getDiagnostics()); - assertTrue(completedEvent.isPreempted()); - assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION, - completedEvent.getTerminationCause()); - Assert.assertFalse(completedEvent.isDiskFailed()); - - schedulerHandler.stop(); - schedulerHandler.close(); - } - - @Test (timeout = 5000) - public void testContainerInternalPreempted() throws IOException { - Configuration conf = new Configuration(false); - schedulerHandler.init(conf); - schedulerHandler.start(); - - AMContainer mockAmContainer = mock(AMContainer.class); - when(mockAmContainer.getTaskSchedulerIdentifier()).thenReturn(0); - when(mockAmContainer.getContainerLauncherIdentifier()).thenReturn(0); - when(mockAmContainer.getTaskCommunicatorIdentifier()).thenReturn(0); - ContainerId mockCId = mock(ContainerId.class); - verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId) any()); - when(mockAMContainerMap.get(mockCId)).thenReturn(mockAmContainer); - schedulerHandler.preemptContainer(0, mockCId); - verify(mockTaskScheduler, times(1)).deallocateContainer(mockCId); - assertEquals(1, mockEventHandler.events.size()); - Event event = mockEventHandler.events.get(0); - assertEquals(AMContainerEventType.C_COMPLETED, event.getType()); - AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event; - assertEquals(mockCId, completedEvent.getContainerId()); - assertEquals("Container preempted internally", completedEvent.getDiagnostics()); - assertTrue(completedEvent.isPreempted()); - Assert.assertFalse(completedEvent.isDiskFailed()); - assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION, - completedEvent.getTerminationCause()); - - schedulerHandler.stop(); - schedulerHandler.close(); - } - - @Test (timeout = 5000) - public void testContainerDiskFailed() throws IOException { - Configuration conf = new Configuration(false); - schedulerHandler.init(conf); - schedulerHandler.start(); - - String diagnostics = "NM disk failed."; - TaskAttemptImpl mockTask = mock(TaskAttemptImpl.class); - ContainerStatus mockStatus = mock(ContainerStatus.class); - ContainerId mockCId = mock(ContainerId.class); - AMContainer mockAMContainer = mock(AMContainer.class); - when(mockAMContainerMap.get(mockCId)).thenReturn(mockAMContainer); - when(mockAMContainer.getContainerId()).thenReturn(mockCId); - when(mockStatus.getContainerId()).thenReturn(mockCId); - when(mockStatus.getDiagnostics()).thenReturn(diagnostics); - when(mockStatus.getExitStatus()).thenReturn(ContainerExitStatus.DISKS_FAILED); - schedulerHandler.containerCompleted(0, mockTask, mockStatus); - assertEquals(1, mockEventHandler.events.size()); - Event event = mockEventHandler.events.get(0); - assertEquals(AMContainerEventType.C_COMPLETED, event.getType()); - AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event; - assertEquals(mockCId, completedEvent.getContainerId()); - assertEquals("Container disk failed. NM disk failed.", - completedEvent.getDiagnostics()); - Assert.assertFalse(completedEvent.isPreempted()); - assertTrue(completedEvent.isDiskFailed()); - assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR, - completedEvent.getTerminationCause()); - - schedulerHandler.stop(); - schedulerHandler.close(); - } - - @Test (timeout = 5000) - public void testContainerExceededPMem() throws IOException { - Configuration conf = new Configuration(false); - schedulerHandler.init(conf); - schedulerHandler.start(); - - String diagnostics = "Exceeded Physical Memory"; - TaskAttemptImpl mockTask = mock(TaskAttemptImpl.class); - ContainerStatus mockStatus = mock(ContainerStatus.class); - ContainerId mockCId = mock(ContainerId.class); - AMContainer mockAMContainer = mock(AMContainer.class); - when(mockAMContainerMap.get(mockCId)).thenReturn(mockAMContainer); - when(mockAMContainer.getContainerId()).thenReturn(mockCId); - when(mockStatus.getContainerId()).thenReturn(mockCId); - when(mockStatus.getDiagnostics()).thenReturn(diagnostics); - // use -104 rather than ContainerExitStatus.KILLED_EXCEEDED_PMEM because - // ContainerExitStatus.KILLED_EXCEEDED_PMEM is only available after hadoop-2.5 - when(mockStatus.getExitStatus()).thenReturn(-104); - schedulerHandler.containerCompleted(0, mockTask, mockStatus); - assertEquals(1, mockEventHandler.events.size()); - Event event = mockEventHandler.events.get(0); - assertEquals(AMContainerEventType.C_COMPLETED, event.getType()); - AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event; - assertEquals(mockCId, completedEvent.getContainerId()); - assertEquals("Container failed, exitCode=-104. Exceeded Physical Memory", - completedEvent.getDiagnostics()); - Assert.assertFalse(completedEvent.isPreempted()); - Assert.assertFalse(completedEvent.isDiskFailed()); - assertEquals(TaskAttemptTerminationCause.CONTAINER_EXITED, - completedEvent.getTerminationCause()); - - schedulerHandler.stop(); - schedulerHandler.close(); - } - - @Test (timeout = 5000) - public void testHistoryUrlConf() throws Exception { - Configuration conf = schedulerHandler.appContext.getAMConf(); - - // ensure history url is empty when timeline server is not the logging class - conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "http://ui-host:9999"); - assertTrue("".equals(schedulerHandler.getHistoryUrl())); - - // ensure expansion of url happens - conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, - "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService"); - final ApplicationId mockApplicationId = mock(ApplicationId.class); - doReturn("TEST_APP_ID").when(mockApplicationId).toString(); - doReturn(mockApplicationId).when(mockAppContext).getApplicationID(); - assertTrue("http://ui-host:9999/#/tez-app/TEST_APP_ID" - .equals(schedulerHandler.getHistoryUrl())); - - // ensure the trailing / in history url is handled - conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "http://ui-host:9998/"); - assertTrue("http://ui-host:9998/#/tez-app/TEST_APP_ID" - .equals(schedulerHandler.getHistoryUrl())); - - // ensure missing scheme in history url is handled - conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "ui-host:9998/"); - Assert.assertTrue("http://ui-host:9998/#/tez-app/TEST_APP_ID" - .equals(schedulerHandler.getHistoryUrl())); - - // handle bad template ex without begining / - conf.set(TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE, - "__HISTORY_URL_BASE__#/somepath"); - assertTrue("http://ui-host:9998/#/somepath" - .equals(schedulerHandler.getHistoryUrl())); - - conf.set(TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE, - "__HISTORY_URL_BASE__?viewPath=tez-app/__APPLICATION_ID__"); - conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "http://localhost/ui/tez"); - assertTrue("http://localhost/ui/tez?viewPath=tez-app/TEST_APP_ID" - .equals(schedulerHandler.getHistoryUrl())); - - } - - @Test(timeout = 5000) - public void testNoSchedulerSpecified() throws IOException { - try { - new TSEHForMultipleSchedulersTest(mockAppContext, mockClientService, mockEventHandler, - mockSigMatcher, mockWebUIService, null, false); - fail("Expecting an IllegalStateException with no schedulers specified"); - } catch (IllegalArgumentException e) { - } - } - - // Verified via statics - @Test(timeout = 5000) - public void testCustomTaskSchedulerSetup() throws IOException { - Configuration conf = new Configuration(false); - conf.set("testkey", "testval"); - UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(conf); - - String customSchedulerName = "fakeScheduler"; - List<NamedEntityDescriptor> taskSchedulers = new LinkedList<>(); - ByteBuffer bb = ByteBuffer.allocate(4); - bb.putInt(0, 3); - UserPayload userPayload = UserPayload.create(bb); - taskSchedulers.add( - new NamedEntityDescriptor(customSchedulerName, FakeTaskScheduler.class.getName()) - .setUserPayload(userPayload)); - taskSchedulers.add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null) - .setUserPayload(defaultPayload)); - - TSEHForMultipleSchedulersTest tseh = - new TSEHForMultipleSchedulersTest(mockAppContext, mockClientService, mockEventHandler, - mockSigMatcher, mockWebUIService, taskSchedulers, false); - - tseh.init(conf); - tseh.start(); - - // Verify that the YARN task scheduler is installed by default - assertTrue(tseh.getYarnSchedulerCreated()); - assertFalse(tseh.getUberSchedulerCreated()); - assertEquals(2, tseh.getNumCreateInvocations()); - - // Verify the order of the schedulers - assertEquals(customSchedulerName, tseh.getTaskSchedulerName(0)); - assertEquals(TezConstants.getTezYarnServicePluginName(), tseh.getTaskSchedulerName(1)); - - // Verify the payload setup for the custom task scheduler - assertNotNull(tseh.getTaskSchedulerContext(0)); - assertEquals(bb, tseh.getTaskSchedulerContext(0).getInitialUserPayload().getPayload()); - - // Verify the payload on the yarn scheduler - assertNotNull(tseh.getTaskSchedulerContext(1)); - Configuration parsed = TezUtils.createConfFromUserPayload(tseh.getTaskSchedulerContext(1).getInitialUserPayload()); - assertEquals("testval", parsed.get("testkey")); - } - - @Test(timeout = 5000) - public void testTaskSchedulerRouting() throws Exception { - Configuration conf = new Configuration(false); - UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(conf); - - String customSchedulerName = "fakeScheduler"; - List<NamedEntityDescriptor> taskSchedulers = new LinkedList<>(); - ByteBuffer bb = ByteBuffer.allocate(4); - bb.putInt(0, 3); - UserPayload userPayload = UserPayload.create(bb); - taskSchedulers.add( - new NamedEntityDescriptor(customSchedulerName, FakeTaskScheduler.class.getName()) - .setUserPayload(userPayload)); - taskSchedulers.add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null) - .setUserPayload(defaultPayload)); - - TSEHForMultipleSchedulersTest tseh = - new TSEHForMultipleSchedulersTest(mockAppContext, mockClientService, mockEventHandler, - mockSigMatcher, mockWebUIService, taskSchedulers, false); - - tseh.init(conf); - tseh.start(); - - // Verify that the YARN task scheduler is installed by default - assertTrue(tseh.getYarnSchedulerCreated()); - assertFalse(tseh.getUberSchedulerCreated()); - assertEquals(2, tseh.getNumCreateInvocations()); - - // Verify the order of the schedulers - assertEquals(customSchedulerName, tseh.getTaskSchedulerName(0)); - assertEquals(TezConstants.getTezYarnServicePluginName(), tseh.getTaskSchedulerName(1)); - - verify(tseh.getTestTaskScheduler(0)).initialize(); - verify(tseh.getTestTaskScheduler(0)).start(); - - ApplicationId appId = ApplicationId.newInstance(1000, 1); - TezDAGID dagId = TezDAGID.getInstance(appId, 1); - TezVertexID vertexID = TezVertexID.getInstance(dagId, 1); - TezTaskID taskId1 = TezTaskID.getInstance(vertexID, 1); - TezTaskAttemptID attemptId11 = TezTaskAttemptID.getInstance(taskId1, 1); - TezTaskID taskId2 = TezTaskID.getInstance(vertexID, 2); - TezTaskAttemptID attemptId21 = TezTaskAttemptID.getInstance(taskId2, 1); - - Resource resource = Resource.newInstance(1024, 1); - - TaskAttempt mockTaskAttempt1 = mock(TaskAttempt.class); - TaskAttempt mockTaskAttempt2 = mock(TaskAttempt.class); - - AMSchedulerEventTALaunchRequest launchRequest1 = - new AMSchedulerEventTALaunchRequest(attemptId11, resource, mock(TaskSpec.class), - mockTaskAttempt1, mock(TaskLocationHint.class), 1, mock(ContainerContext.class), 0, 0, - 0); - - tseh.handle(launchRequest1); - - verify(tseh.getTestTaskScheduler(0)).allocateTask(eq(mockTaskAttempt1), eq(resource), - any(String[].class), any(String[].class), any(Priority.class), any(Object.class), - eq(launchRequest1)); - - AMSchedulerEventTALaunchRequest launchRequest2 = - new AMSchedulerEventTALaunchRequest(attemptId21, resource, mock(TaskSpec.class), - mockTaskAttempt2, mock(TaskLocationHint.class), 1, mock(ContainerContext.class), 1, 0, - 0); - tseh.handle(launchRequest2); - verify(tseh.getTestTaskScheduler(1)).allocateTask(eq(mockTaskAttempt2), eq(resource), - any(String[].class), any(String[].class), any(Priority.class), any(Object.class), - eq(launchRequest2)); - } - - private static class TSEHForMultipleSchedulersTest extends TaskSchedulerEventHandler { - - private final TaskScheduler yarnTaskScheduler; - private final TaskScheduler uberTaskScheduler; - private final AtomicBoolean uberSchedulerCreated = new AtomicBoolean(false); - private final AtomicBoolean yarnSchedulerCreated = new AtomicBoolean(false); - private final AtomicInteger numCreateInvocations = new AtomicInteger(0); - private final Set<Integer> seenSchedulers = new HashSet<>(); - private final List<TaskSchedulerContext> taskSchedulerContexts = new LinkedList<>(); - private final List<String> taskSchedulerNames = new LinkedList<>(); - private final List<TaskScheduler> testTaskSchedulers = new LinkedList<>(); - - public TSEHForMultipleSchedulersTest(AppContext appContext, - DAGClientServer clientService, - EventHandler eventHandler, - ContainerSignatureMatcher containerSignatureMatcher, - WebUIService webUI, - List<NamedEntityDescriptor> schedulerDescriptors, - boolean isPureLocalMode) { - super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, - schedulerDescriptors, isPureLocalMode); - yarnTaskScheduler = mock(TaskScheduler.class); - uberTaskScheduler = mock(TaskScheduler.class); - } - - @Override - TaskScheduler createTaskScheduler(String host, int port, String trackingUrl, - AppContext appContext, - NamedEntityDescriptor taskSchedulerDescriptor, - long customAppIdIdentifier, - int schedulerId) { - - numCreateInvocations.incrementAndGet(); - boolean added = seenSchedulers.add(schedulerId); - assertTrue("Cannot add multiple schedulers with the same schedulerId", added); - taskSchedulerNames.add(taskSchedulerDescriptor.getEntityName()); - return super.createTaskScheduler(host, port, trackingUrl, appContext, taskSchedulerDescriptor, - customAppIdIdentifier, schedulerId); - } - - @Override - TaskSchedulerContext wrapTaskSchedulerContext(TaskSchedulerContext rawContext) { - // Avoid wrapping in threads - return rawContext; - } - - @Override - TaskScheduler createYarnTaskScheduler(TaskSchedulerContext taskSchedulerContext, int schedulerId) { - taskSchedulerContexts.add(taskSchedulerContext); - testTaskSchedulers.add(yarnTaskScheduler); - yarnSchedulerCreated.set(true); - return yarnTaskScheduler; - } - - @Override - TaskScheduler createUberTaskScheduler(TaskSchedulerContext taskSchedulerContext, int schedulerId) { - taskSchedulerContexts.add(taskSchedulerContext); - uberSchedulerCreated.set(true); - testTaskSchedulers.add(yarnTaskScheduler); - return uberTaskScheduler; - } - - @Override - TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext, - NamedEntityDescriptor taskSchedulerDescriptor, int schedulerId) { - taskSchedulerContexts.add(taskSchedulerContext); - TaskScheduler taskScheduler = spy(super.createCustomTaskScheduler(taskSchedulerContext, taskSchedulerDescriptor, schedulerId)); - testTaskSchedulers.add(taskScheduler); - return taskScheduler; - } - - @Override - // Inline handling of events. - public void handle(AMSchedulerEvent event) { - handleEvent(event); - } - - public boolean getUberSchedulerCreated() { - return uberSchedulerCreated.get(); - } - - public boolean getYarnSchedulerCreated() { - return yarnSchedulerCreated.get(); - } - - public int getNumCreateInvocations() { - return numCreateInvocations.get(); - } - - public TaskSchedulerContext getTaskSchedulerContext(int schedulerId) { - return taskSchedulerContexts.get(schedulerId); - } - - public String getTaskSchedulerName(int schedulerId) { - return taskSchedulerNames.get(schedulerId); - } - - public TaskScheduler getTestTaskScheduler(int schedulerId) { - return testTaskSchedulers.get(schedulerId); - } - } - - public static class FakeTaskScheduler extends TaskScheduler { - - public FakeTaskScheduler( - TaskSchedulerContext taskSchedulerContext) { - super(taskSchedulerContext); - } - - @Override - public Resource getAvailableResources() { - return null; - } - - @Override - public int getClusterNodeCount() { - return 0; - } - - @Override - public void dagComplete() { - - } - - @Override - public Resource getTotalResources() { - return null; - } - - @Override - public void blacklistNode(NodeId nodeId) { - - } - - @Override - public void unblacklistNode(NodeId nodeId) { - - } - - @Override - public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks, - Priority priority, Object containerSignature, Object clientCookie) { - - } - - @Override - public void allocateTask(Object task, Resource capability, ContainerId containerId, - Priority priority, Object containerSignature, Object clientCookie) { - - } - - @Override - public boolean deallocateTask(Object task, boolean taskSucceeded, - TaskAttemptEndReason endReason, - String diagnostics) { - return false; - } - - @Override - public Object deallocateContainer(ContainerId containerId) { - return null; - } - - @Override - public void setShouldUnregister() { - - } - - @Override - public boolean hasUnregistered() { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index c13ca5a..b1bc491 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -125,19 +125,19 @@ class TestTaskSchedulerHelpers { } // Overrides start / stop. Will be controlled without the extra event handling thread. - static class TaskSchedulerEventHandlerForTest extends - TaskSchedulerEventHandler { + static class TaskSchedulerManagerForTest extends + TaskSchedulerManager { private TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync; private ContainerSignatureMatcher containerSignatureMatcher; private UserPayload defaultPayload; @SuppressWarnings("rawtypes") - public TaskSchedulerEventHandlerForTest(AppContext appContext, - EventHandler eventHandler, - TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync, - ContainerSignatureMatcher containerSignatureMatcher, - UserPayload defaultPayload) { + public TaskSchedulerManagerForTest(AppContext appContext, + EventHandler eventHandler, + TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync, + ContainerSignatureMatcher containerSignatureMatcher, + UserPayload defaultPayload) { super(appContext, null, eventHandler, containerSignatureMatcher, null, Lists.newArrayList(new NamedEntityDescriptor("FakeScheduler", null)), false);
