http://git-wip-us.apache.org/repos/asf/tez/blob/359eba29/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index 6ea1388..656bca1 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -223,7 +223,7 @@ public class TestContainerReuse { taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded( - ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED)); + ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta11), eq(true)); verify(taskSchedulerEventHandler, times(1)).taskAllocated( @@ -235,7 +235,7 @@ public class TestContainerReuse { taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded(ta21, containerHost2.getId(), - TaskAttemptState.SUCCEEDED)); + TaskAttemptState.SUCCEEDED, 0)); long currentTs = System.currentTimeMillis(); Throwable exception = null; @@ -356,7 +356,7 @@ public class TestContainerReuse { taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded(ta21, containerHost2.getId(), - TaskAttemptState.SUCCEEDED)); + TaskAttemptState.SUCCEEDED, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta21), eq(true)); verify(taskSchedulerEventHandler, times(0)).taskAllocated( @@ -459,7 +459,7 @@ public class TestContainerReuse { verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(container1)); // Task assigned to container completed successfully. Container should be re-used. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta11), eq(true)); verify(taskSchedulerEventHandler).taskAllocated(eq(ta12), any(Object.class), eq(container1)); @@ -469,7 +469,7 @@ public class TestContainerReuse { // Task assigned to container completed successfully. // Verify reuse across hosts. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta12), eq(true)); verify(taskSchedulerEventHandler).taskAllocated(eq(ta13), any(Object.class), eq(container1)); @@ -478,7 +478,7 @@ public class TestContainerReuse { eventHandler.reset(); // Verify no re-use if a previous task fails. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, 0)); drainableAppCallback.drain(); verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class), eq(container1)); verify(taskScheduler).deallocateTask(eq(ta13), eq(false)); @@ -496,7 +496,7 @@ public class TestContainerReuse { verify(taskSchedulerEventHandler).taskAllocated(eq(ta14), any(Object.class), eq(container2)); // Task assigned to container completed successfully. No pending requests. Container should be released. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta14), eq(true)); verify(rmClient).releaseAssignedContainer(eq(container2.getId())); @@ -607,7 +607,7 @@ public class TestContainerReuse { // First task had profiling on. This container can not be reused further. taskSchedulerEventHandler.handleEvent( - new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED)); + new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta11), eq(true)); verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta12), any(Object.class), @@ -653,7 +653,7 @@ public class TestContainerReuse { // Verify that the container can not be reused when profiling option is turned on // Even for 2 tasks having same profiling option can have container reusability. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta13), eq(true)); verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class), @@ -698,7 +698,7 @@ public class TestContainerReuse { verify(taskSchedulerEventHandler).taskAllocated(eq(ta15), any(Object.class), eq(container3)); //Ensure task 6 (of vertex 1) is allocated to same container - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta15), eq(true)); verify(taskSchedulerEventHandler).taskAllocated(eq(ta16), any(Object.class), eq(container3)); @@ -811,7 +811,7 @@ public class TestContainerReuse { // until delay expires. taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded(ta11, container1.getId(), - TaskAttemptState.SUCCEEDED)); + TaskAttemptState.SUCCEEDED, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta11), eq(true)); verify(taskSchedulerEventHandler, times(0)).taskAllocated( @@ -828,7 +828,7 @@ public class TestContainerReuse { // TA12 completed. taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded(ta12, container1.getId(), - TaskAttemptState.SUCCEEDED)); + TaskAttemptState.SUCCEEDED, 0)); drainableAppCallback.drain(); LOG.info("Sleeping to ensure that the scheduling loop runs"); Thread.sleep(3000l); @@ -946,7 +946,7 @@ public class TestContainerReuse { // Container should be assigned to task21. taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded(ta11, container1.getId(), - TaskAttemptState.SUCCEEDED)); + TaskAttemptState.SUCCEEDED, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta11), eq(true)); verify(taskSchedulerEventHandler).taskAllocated( @@ -956,7 +956,7 @@ public class TestContainerReuse { // Task 2 completes. taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded(ta21, container1.getId(), - TaskAttemptState.SUCCEEDED)); + TaskAttemptState.SUCCEEDED, 0)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); LOG.info("Sleeping to ensure that the scheduling loop runs"); @@ -1065,7 +1065,7 @@ public class TestContainerReuse { assertEquals(1, assignEvent.getRemoteTaskLocalResources().size()); // Task assigned to container completed successfully. Container should be re-used. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta111), eq(true)); verify(taskSchedulerEventHandler).taskAllocated(eq(ta112), any(Object.class), eq(container1)); @@ -1077,7 +1077,7 @@ public class TestContainerReuse { // Task assigned to container completed successfully. // Verify reuse across hosts. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta112), eq(true)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); @@ -1118,7 +1118,7 @@ public class TestContainerReuse { assertEquals(2, assignEvent.getRemoteTaskLocalResources().size()); eventHandler.reset(); - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta211), eq(true)); verify(taskSchedulerEventHandler).taskAllocated(eq(ta212), any(Object.class), eq(container1));
http://git-wip-us.apache.org/repos/asf/tez/blob/359eba29/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java index 60782e6..12390b2 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java @@ -59,7 +59,7 @@ public class TestLocalTaskScheduler { TezConfiguration tezConf = new TezConfiguration(); tezConf.setInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, MAX_TASKS); - LocalContainerFactory containerFactory = new LocalContainerFactory(createMockAppContext()); + LocalContainerFactory containerFactory = new LocalContainerFactory(createMockAppContext(), 1000); HashMap<Object, Container> taskAllocations = new LinkedHashMap<Object, Container>(); PriorityBlockingQueue<TaskRequest> taskRequestQueue = new PriorityBlockingQueue<TaskRequest>(); TaskSchedulerAppCallback appClientDelegate = mock(TaskSchedulerAppCallback.class); http://git-wip-us.apache.org/repos/asf/tez/blob/359eba29/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java index 3cf4f6c..25cf4b5 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java @@ -22,6 +22,8 @@ import java.util.HashMap; import java.util.concurrent.BlockingQueue; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -81,8 +83,12 @@ public class TestLocalTaskSchedulerService { */ @Test(timeout = 5000) public void testDeallocationBeforeAllocation() { + AppContext appContext = mock(AppContext.class); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000l, 1), 1); + doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce - (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", mock(AppContext.class)); + (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", appContext); taskSchedulerService.init(new Configuration()); taskSchedulerService.start(); @@ -105,8 +111,12 @@ public class TestLocalTaskSchedulerService { */ @Test(timeout = 5000) public void testDeallocationAfterAllocation() { + AppContext appContext = mock(AppContext.class); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000l, 1), 1); + doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce - (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", mock(AppContext.class)); + (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", appContext); taskSchedulerService.init(new Configuration()); taskSchedulerService.start(); @@ -132,13 +142,13 @@ public class TestLocalTaskSchedulerService { String appHostName, int appHostPort, String appTrackingUrl, AppContext appContext) { super(appClient, containerSignatureMatcher, appHostName, appHostPort, - appTrackingUrl, appContext); + appTrackingUrl, 10000l, appContext); } @Override public AsyncDelegateRequestHandler createRequestHandler(Configuration conf) { requestHandler = new MockAsyncDelegateRequestHandler(taskRequestQueue, - new LocalContainerFactory(appContext), + new LocalContainerFactory(appContext, customContainerAppId), taskAllocations, appClientDelegate, conf); http://git-wip-us.apache.org/repos/asf/tez/blob/359eba29/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java index b7a3a87..daf1db6 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java @@ -89,7 +89,7 @@ public class TestTaskSchedulerEventHandler { public MockTaskSchedulerEventHandler(AppContext appContext, DAGClientServer clientService, EventHandler eventHandler, ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) { - super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new String[] {}); + super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new String[] {}, false); } @Override @@ -162,7 +162,7 @@ public class TestTaskSchedulerEventHandler { AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt, locHint, - priority, containerContext); + priority, containerContext, 0, 0, 0); schedulerHandler.taskAllocated(mockTaskAttempt, lr, container); assertEquals(2, mockEventHandler.events.size()); assertTrue(mockEventHandler.events.get(1) instanceof AMContainerEventAssignTA); @@ -249,9 +249,14 @@ public class TestTaskSchedulerEventHandler { Configuration conf = new Configuration(false); schedulerHandler.init(conf); schedulerHandler.start(); - + + AMContainer mockAmContainer = mock(AMContainer.class); + when(mockAmContainer.getTaskSchedulerIdentifier()).thenReturn(0); + when(mockAmContainer.getContainerLauncherIdentifier()).thenReturn(0); + when(mockAmContainer.getTaskCommunicatorIdentifier()).thenReturn(0); ContainerId mockCId = mock(ContainerId.class); verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId)any()); + when(mockAMContainerMap.get(mockCId)).thenReturn(mockAmContainer); schedulerHandler.preemptContainer(mockCId); verify(mockTaskScheduler, times(1)).deallocateContainer(mockCId); assertEquals(1, mockEventHandler.events.size()); http://git-wip-us.apache.org/repos/asf/tez/blob/359eba29/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index d775300..ffab769 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -127,7 +127,7 @@ class TestTaskSchedulerHelpers { EventHandler eventHandler, TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync, ContainerSignatureMatcher containerSignatureMatcher) { - super(appContext, null, eventHandler, containerSignatureMatcher, null, new String[]{}); + super(appContext, null, eventHandler, containerSignatureMatcher, null, new String[]{}, false); this.amrmClientAsync = amrmClientAsync; this.containerSignatureMatcher = containerSignatureMatcher; } http://git-wip-us.apache.org/repos/asf/tez/blob/359eba29/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index fafbba6..bdd0f61 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; +import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.ContainerContext; @@ -104,7 +105,7 @@ public class TestAMContainer { wc.verifyState(AMContainerState.LAUNCHING); // 1 Launch request. wc.verifyCountAndGetOutgoingEvents(1); - verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); assertNull(wc.amContainer.getCurrentTaskAttempt()); // Assign task. @@ -121,7 +122,7 @@ public class TestAMContainer { // Once for the previous NO_TASKS, one for the actual task. verify(wc.chh).register(wc.containerID); ArgumentCaptor<AMContainerTask> argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); assertEquals(1, argumentCaptor.getAllValues().size()); assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID()); assertEquals(WrappedContainer.taskPriority, argumentCaptor.getAllValues().get(0).getPriority()); @@ -131,14 +132,14 @@ public class TestAMContainer { wc.verifyState(AMContainerState.IDLE); wc.verifyNoOutgoingEvents(); assertNull(wc.amContainer.getCurrentTaskAttempt()); - verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID); + verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0); // Container completed wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).unregister(wc.containerID); assertEquals(1, wc.amContainer.getAllTaskAttempts().size()); @@ -157,7 +158,7 @@ public class TestAMContainer { wc.verifyState(AMContainerState.LAUNCHING); // 1 Launch request. wc.verifyCountAndGetOutgoingEvents(1); - verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); // Container Launched wc.containerLaunched(); @@ -172,7 +173,7 @@ public class TestAMContainer { wc.verifyNoOutgoingEvents(); assertEquals(wc.taskAttemptID, wc.amContainer.getCurrentTaskAttempt()); ArgumentCaptor<AMContainerTask> argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); assertEquals(1, argumentCaptor.getAllValues().size()); assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID()); @@ -180,13 +181,13 @@ public class TestAMContainer { wc.verifyState(AMContainerState.IDLE); wc.verifyNoOutgoingEvents(); assertNull(wc.amContainer.getCurrentTaskAttempt()); - verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID); + verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0); wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).unregister(wc.containerID); assertEquals(1, wc.amContainer.getAllTaskAttempts().size()); @@ -205,7 +206,7 @@ public class TestAMContainer { wc.verifyState(AMContainerState.LAUNCHING); // 1 Launch request. wc.verifyCountAndGetOutgoingEvents(1); - verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); assertNull(wc.amContainer.getCurrentTaskAttempt()); // Assign task. @@ -222,7 +223,7 @@ public class TestAMContainer { // Once for the previous NO_TASKS, one for the actual task. verify(wc.chh).register(wc.containerID); ArgumentCaptor<AMContainerTask> argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); assertEquals(1, argumentCaptor.getAllValues().size()); assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID()); @@ -231,13 +232,13 @@ public class TestAMContainer { wc.verifyState(AMContainerState.IDLE); wc.verifyNoOutgoingEvents(); assertNull(wc.amContainer.getCurrentTaskAttempt()); - verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID); + verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0); TezTaskAttemptID taId2 = TezTaskAttemptID.getInstance(wc.taskID, 2); wc.assignTaskAttempt(taId2); wc.verifyState(AMContainerState.RUNNING); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); assertEquals(2, argumentCaptor.getAllValues().size()); assertEquals(taId2, argumentCaptor.getAllValues().get(1).getTask().getTaskAttemptID()); @@ -246,14 +247,14 @@ public class TestAMContainer { wc.verifyState(AMContainerState.IDLE); wc.verifyNoOutgoingEvents(); assertNull(wc.amContainer.getCurrentTaskAttempt()); - verify(wc.tal).unregisterTaskAttempt(taId2); + verify(wc.tal).unregisterTaskAttempt(taId2, 0); // Container completed wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).unregister(wc.containerID); assertEquals(2, wc.amContainer.getAllTaskAttempts().size()); @@ -286,7 +287,7 @@ public class TestAMContainer { wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).unregister(wc.containerID); assertNull(wc.amContainer.getCurrentTaskAttempt()); @@ -323,7 +324,7 @@ public class TestAMContainer { wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).unregister(wc.containerID); assertNull(wc.amContainer.getCurrentTaskAttempt()); @@ -346,7 +347,7 @@ public class TestAMContainer { wc.assignTaskAttempt(taID2); wc.verifyState(AMContainerState.STOP_REQUESTED); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).unregister(wc.containerID); // 1 for NM stop request. 2 TERMINATING to TaskAttempt. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); @@ -378,13 +379,13 @@ public class TestAMContainer { wc.launchContainer(); wc.assignTaskAttempt(wc.taskAttemptID); wc.verifyState(AMContainerState.LAUNCHING); - verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2); wc.assignTaskAttempt(taID2); wc.verifyState(AMContainerState.STOP_REQUESTED); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).unregister(wc.containerID); // 1 for NM stop request. 2 TERMINATING to TaskAttempt. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); @@ -420,7 +421,7 @@ public class TestAMContainer { wc.containerTimedOut(); wc.verifyState(AMContainerState.STOP_REQUESTED); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).unregister(wc.containerID); // 1 to TA, 1 for RM de-allocate. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); @@ -454,7 +455,7 @@ public class TestAMContainer { wc.stopRequest(); wc.verifyState(AMContainerState.STOP_REQUESTED); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).unregister(wc.containerID); // 1 to TA, 1 for RM de-allocate. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); @@ -484,11 +485,11 @@ public class TestAMContainer { wc.launchContainer(); wc.assignTaskAttempt(wc.taskAttemptID); wc.verifyState(AMContainerState.LAUNCHING); - verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); wc.launchFailed(); wc.verifyState(AMContainerState.STOPPING); - verify(wc.tal).registerRunningContainer(wc.containerID); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -537,8 +538,8 @@ public class TestAMContainer { wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); - verify(wc.tal).registerRunningContainer(wc.containerID); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -567,8 +568,8 @@ public class TestAMContainer { wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR); wc.verifyState(AMContainerState.COMPLETED); - verify(wc.tal).registerRunningContainer(wc.containerID); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -598,8 +599,8 @@ public class TestAMContainer { wc.containerCompleted(ContainerExitStatus.ABORTED, TaskAttemptTerminationCause.NODE_FAILED); wc.verifyState(AMContainerState.COMPLETED); - verify(wc.tal).registerRunningContainer(wc.containerID); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -629,8 +630,8 @@ public class TestAMContainer { wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); - verify(wc.tal).registerRunningContainer(wc.containerID); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -658,8 +659,8 @@ public class TestAMContainer { wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); - verify(wc.tal).registerRunningContainer(wc.containerID); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -693,8 +694,8 @@ public class TestAMContainer { wc.containerCompleted(ContainerExitStatus.PREEMPTED, TaskAttemptTerminationCause.EXTERNAL_PREEMPTION); wc.verifyState(AMContainerState.COMPLETED); - verify(wc.tal).registerRunningContainer(wc.containerID); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -730,8 +731,8 @@ public class TestAMContainer { wc.containerCompleted(ContainerExitStatus.INVALID, TaskAttemptTerminationCause.INTERNAL_PREEMPTION); wc.verifyState(AMContainerState.COMPLETED); - verify(wc.tal).registerRunningContainer(wc.containerID); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -767,8 +768,8 @@ public class TestAMContainer { wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR); wc.verifyState(AMContainerState.COMPLETED); - verify(wc.tal).registerRunningContainer(wc.containerID); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -1011,7 +1012,7 @@ public class TestAMContainer { wc.containerLaunched(); wc.assignTaskAttempt(wc.taskAttemptID); ArgumentCaptor<AMContainerTask> argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); AMContainerTask task1 = argumentCaptor.getAllValues().get(0); assertEquals(0, task1.getAdditionalResources().size()); wc.taskAttemptSucceeded(wc.taskAttemptID); @@ -1024,7 +1025,7 @@ public class TestAMContainer { TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2); wc.assignTaskAttempt(taID2, additionalResources, new Credentials()); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); AMContainerTask task2 = argumentCaptor.getAllValues().get(1); Map<String, LocalResource> pullTaskAdditionalResources = task2.getAdditionalResources(); assertEquals(2, pullTaskAdditionalResources.size()); @@ -1047,7 +1048,7 @@ public class TestAMContainer { TezTaskAttemptID taID3 = TezTaskAttemptID.getInstance(wc.taskID, 3); wc.assignTaskAttempt(taID3, new HashMap<String, LocalResource>(), new Credentials()); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); AMContainerTask task3 = argumentCaptor.getAllValues().get(2); assertEquals(0, task3.getAdditionalResources().size()); wc.taskAttemptSucceeded(taID3); @@ -1100,7 +1101,7 @@ public class TestAMContainer { wc.containerLaunched(); wc.assignTaskAttempt(attempt11, LRs, dag1Credentials); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); fetchedTask = argumentCaptor.getAllValues().get(0); assertTrue(fetchedTask.haveCredentialsChanged()); assertNotNull(fetchedTask.getCredentials()); @@ -1109,7 +1110,7 @@ public class TestAMContainer { wc.assignTaskAttempt(attempt12, LRs, dag1Credentials); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); fetchedTask = argumentCaptor.getAllValues().get(1); assertFalse(fetchedTask.haveCredentialsChanged()); assertNull(fetchedTask.getCredentials()); @@ -1119,7 +1120,7 @@ public class TestAMContainer { wc.setNewDAGID(dagID2); wc.assignTaskAttempt(attempt21, LRs, null); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); fetchedTask = argumentCaptor.getAllValues().get(2); assertTrue(fetchedTask.haveCredentialsChanged()); assertNull(fetchedTask.getCredentials()); @@ -1127,7 +1128,7 @@ public class TestAMContainer { wc.assignTaskAttempt(attempt22, LRs, null); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(4)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(4)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); fetchedTask = argumentCaptor.getAllValues().get(3); assertFalse(fetchedTask.haveCredentialsChanged()); assertNull(fetchedTask.getCredentials()); @@ -1137,7 +1138,7 @@ public class TestAMContainer { wc.setNewDAGID(dagID3); wc.assignTaskAttempt(attempt31, LRs , dag3Credentials); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(5)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(5)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); fetchedTask = argumentCaptor.getAllValues().get(4); assertTrue(fetchedTask.haveCredentialsChanged()); assertNotNull(fetchedTask.getCredentials()); @@ -1147,7 +1148,7 @@ public class TestAMContainer { wc.assignTaskAttempt(attempt32, LRs, dag1Credentials); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(6)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(6)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); fetchedTask = argumentCaptor.getAllValues().get(5); assertFalse(fetchedTask.haveCredentialsChanged()); assertNull(fetchedTask.getCredentials()); @@ -1200,9 +1201,10 @@ public class TestAMContainer { chh = mock(ContainerHeartbeatHandler.class); - InetSocketAddress addr = new InetSocketAddress("localhost", 0); tal = mock(TaskAttemptListener.class); - doReturn(addr).when(tal).getAddress(); + TaskCommunicator taskComm = mock(TaskCommunicator.class); + doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress(); + doReturn(taskComm).when(tal).getTaskCommunicator(0); dagID = TezDAGID.getInstance(applicationID, 1); vertexID = TezVertexID.getInstance(dagID, 1); @@ -1228,7 +1230,7 @@ public class TestAMContainer { doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID(); amContainer = new AMContainerImpl(container, chh, tal, - new ContainerContextMatcher(), appContext); + new ContainerContextMatcher(), appContext, 0, 0, 0); } public WrappedContainer() { @@ -1278,7 +1280,7 @@ public class TestAMContainer { Token<JobTokenIdentifier> jobToken = mock(Token.class); TokenCache.setSessionToken(jobToken, credentials); amContainer.handle(new AMContainerEventLaunchRequest(containerID, vertexID, - new ContainerContext(localResources, credentials, new HashMap<String, String>(), ""))); + new ContainerContext(localResources, credentials, new HashMap<String, String>(), ""), 0, 0)); } public void assignTaskAttempt(TezTaskAttemptID taID) { http://git-wip-us.apache.org/repos/asf/tez/blob/359eba29/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java index 61371e8..dee4541 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.TaskAttemptListener; @@ -43,8 +44,9 @@ public class TestAMContainerMap { private TaskAttemptListener mockTaskAttemptListener() { TaskAttemptListener tal = mock(TaskAttemptListener.class); - InetSocketAddress socketAddr = new InetSocketAddress("localhost", 21000); - doReturn(socketAddr).when(tal).getAddress(); + TaskCommunicator taskComm = mock(TaskCommunicator.class); + doReturn(new InetSocketAddress("localhost", 21000)).when(taskComm).getAddress(); + doReturn(taskComm).when(tal).getTaskCommunicator(0); return tal; } http://git-wip-us.apache.org/repos/asf/tez/blob/359eba29/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java index 1d124a6..ba17ba0 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java @@ -20,6 +20,7 @@ package org.apache.tez.examples; import java.io.IOException; import java.util.Set; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,7 +136,7 @@ public class JoinValidate extends TezExampleBase { private DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions) throws IOException { - DAG dag = DAG.create("JoinValidate"); + DAG dag = DAG.create(getDagName()); // Configuration for intermediate output - shared by Vertex1 and Vertex2 // This should only be setting selective keys from the underlying conf. Fix after there's a @@ -152,15 +153,18 @@ public class JoinValidate extends TezExampleBase { MRInput .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, lhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build()); + setVertexProperties(lhsVertex, getLhsVertexProperties()); Vertex rhsVertex = Vertex.create(RHS_INPUT_NAME, ProcessorDescriptor.create( ForwardingProcessor.class.getName())).addDataSource("rhs", MRInput .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, rhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build()); + setVertexProperties(rhsVertex, getRhsVertexProperties()); Vertex joinValidateVertex = Vertex.create("joinvalidate", ProcessorDescriptor.create( JoinValidateProcessor.class.getName()), numPartitions); + setVertexProperties(joinValidateVertex, getValidateVertexProperties()); Edge e1 = Edge.create(lhsVertex, joinValidateVertex, edgeConf.createDefaultEdgeProperty()); Edge e2 = Edge.create(rhsVertex, joinValidateVertex, edgeConf.createDefaultEdgeProperty()); @@ -170,6 +174,30 @@ public class JoinValidate extends TezExampleBase { return dag; } + private void setVertexProperties(Vertex vertex, Map<String, String> properties) { + if (properties != null) { + for (Map.Entry<String, String> entry : properties.entrySet()) { + vertex.setConf(entry.getKey(), entry.getValue()); + } + } + } + + protected Map<String, String> getLhsVertexProperties() { + return null; + } + + protected Map<String, String> getRhsVertexProperties() { + return null; + } + + protected Map<String, String> getValidateVertexProperties() { + return null; + } + + protected String getDagName() { + return "JoinValidate"; + } + public static class JoinValidateProcessor extends SimpleProcessor { private static final Logger LOG = LoggerFactory.getLogger(JoinValidateProcessor.class); http://git-wip-us.apache.org/repos/asf/tez/blob/359eba29/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java index e83165b..27356bc 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java @@ -14,6 +14,8 @@ package org.apache.tez.dag.app.launcher; +import java.net.InetSocketAddress; + import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; @@ -124,7 +126,8 @@ public class TezTestServiceContainerLauncher extends AbstractService implements private RunContainerRequestProto constructRunContainerRequest(NMCommunicatorLaunchRequestEvent event) { RunContainerRequestProto.Builder builder = RunContainerRequestProto.newBuilder(); - builder.setAmHost(tal.getAddress().getHostName()).setAmPort(tal.getAddress().getPort()); + InetSocketAddress address = tal.getTaskCommunicator(event.getTaskCommId()).getAddress(); + builder.setAmHost(address.getHostName()).setAmPort(address.getPort()); builder.setAppAttemptNumber(event.getContainer().getId().getApplicationAttemptId().getAttemptId()); builder.setApplicationIdString( event.getContainer().getId().getApplicationAttemptId().getApplicationId().toString()); http://git-wip-us.apache.org/repos/asf/tez/blob/359eba29/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java index e3c18bf..5657f86 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java @@ -14,7 +14,6 @@ package org.apache.tez.dag.app.rm; -import java.io.IOException; import java.util.Arrays; import java.util.LinkedList; import java.util.List; @@ -32,25 +31,17 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.client.api.AMRMClient; -import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; import org.apache.tez.service.TezTestServiceConfConstants; -// TODO Registration with RM - so that the AM is considered dead and restarted in the expiry interval - 10 minutes. - public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { private static final Log LOG = LogFactory.getLog(TezTestServiceTaskSchedulerService.class); @@ -71,7 +62,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { private final ConcurrentMap<Object, ContainerId> runningTasks = new ConcurrentHashMap<Object, ContainerId>(); - private final AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient; + // AppIdIdentifier to avoid conflicts with other containres in the system. // Per instance private final int memoryPerInstance; @@ -82,10 +73,13 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { private final Resource resourcePerContainer; + // Not registering with the RM. Assuming the main TezScheduler will always run (except local mode), + // and take care of YARN registration. public TezTestServiceTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext appContext, String clientHostname, int clientPort, String trackingUrl, + long customAppIdIdentifier, Configuration conf) { // Accepting configuration here to allow setting up fields as final super(TezTestServiceTaskSchedulerService.class.getName()); @@ -93,7 +87,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { this.appClientDelegate = createAppCallbackDelegate(appClient); this.appContext = appContext; this.serviceHosts = new LinkedList<String>(); - this.containerFactory = new ContainerFactory(appContext); + this.containerFactory = new ContainerFactory(appContext, customAppIdIdentifier); this.memoryPerInstance = conf .getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, -1); @@ -123,7 +117,6 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { int memoryPerContainer = (int) (memoryPerInstance / (float) executorsPerInstance); int coresPerContainer = (int) (coresPerInstance / (float) executorsPerInstance); this.resourcePerContainer = Resource.newInstance(memoryPerContainer, coresPerContainer); - this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(5000, new FakeAmRmCallbackHandler()); String[] hosts = conf.getTrimmedStrings(TezTestServiceConfConstants.TEZ_TEST_SERVICE_HOSTS); if (hosts == null || hosts.length == 0) { @@ -143,36 +136,8 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { } @Override - public void serviceInit(Configuration conf) { - amRmClient.init(conf); - } - - @Override - public void serviceStart() { - amRmClient.start(); - RegisterApplicationMasterResponse response; - try { - amRmClient.registerApplicationMaster(clientHostname, clientPort, trackingUrl); - } catch (YarnException e) { - throw new TezUncheckedException(e); - } catch (IOException e) { - throw new TezUncheckedException(e); - } - } - - @Override public void serviceStop() { if (!this.isStopped.getAndSet(true)) { - - try { - TaskSchedulerAppCallback.AppFinalStatus status = appClientDelegate.getFinalAppStatus(); - amRmClient.unregisterApplicationMaster(status.exitStatus, status.exitMessage, - status.postCompletionTrackingUrl); - } catch (YarnException e) { - throw new TezUncheckedException(e); - } catch (IOException e) { - throw new TezUncheckedException(e); - } appCallbackExecutor.shutdownNow(); } } @@ -264,7 +229,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { private ExecutorService createAppCallbackExecutorService() { return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() - .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build()); + .setNameFormat("TezTestTaskSchedulerAppCaller").setDaemon(true).build()); } private TaskSchedulerAppCallback createAppCallbackDelegate( @@ -274,7 +239,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { } private String selectHost(String[] requestedHosts) { - String host = null; + String host; if (requestedHosts != null && requestedHosts.length > 0) { Arrays.sort(requestedHosts); host = requestedHosts[0]; @@ -287,17 +252,19 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { } static class ContainerFactory { - final AppContext appContext; AtomicInteger nextId; - - public ContainerFactory(AppContext appContext) { - this.appContext = appContext; - this.nextId = new AtomicInteger(2); + final ApplicationAttemptId customAppAttemptId; + + public ContainerFactory(AppContext appContext, long appIdLong) { + this.nextId = new AtomicInteger(1); + ApplicationId appId = ApplicationId + .newInstance(appIdLong, appContext.getApplicationAttemptId().getApplicationId().getId()); + this.customAppAttemptId = ApplicationAttemptId + .newInstance(appId, appContext.getApplicationAttemptId().getAttemptId()); } public Container createContainer(Resource capability, Priority priority, String hostname, int port) { - ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId(); - ContainerId containerId = ContainerId.newInstance(appAttemptId, nextId.getAndIncrement()); + ContainerId containerId = ContainerId.newInstance(customAppAttemptId, nextId.getAndIncrement()); NodeId nodeId = NodeId.newInstance(hostname, port); String nodeHttpAddress = "hostname:0"; @@ -311,37 +278,4 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { return container; } } - - private static class FakeAmRmCallbackHandler implements AMRMClientAsync.CallbackHandler { - - @Override - public void onContainersCompleted(List<ContainerStatus> statuses) { - - } - - @Override - public void onContainersAllocated(List<Container> containers) { - - } - - @Override - public void onShutdownRequest() { - - } - - @Override - public void onNodesUpdated(List<NodeReport> updatedNodes) { - - } - - @Override - public float getProgress() { - return 0; - } - - @Override - public void onError(Throwable e) { - - } - } } http://git-wip-us.apache.org/repos/asf/tez/blob/359eba29/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java new file mode 100644 index 0000000..e5d2e3b --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.examples; + +import java.util.Map; + +public class JoinValidateConfigured extends JoinValidate { + + private final Map<String, String> lhsProps; + private final Map<String, String> rhsProps; + private final Map<String, String> validateProps; + private final String dagNameSuffix; + + public JoinValidateConfigured(Map<String, String> lhsProps, Map<String, String> rhsProps, + Map<String, String> validateProps, String dagNameSuffix) { + this.lhsProps = lhsProps; + this.rhsProps = rhsProps; + this.validateProps = validateProps; + this.dagNameSuffix = dagNameSuffix; + } + + @Override + protected Map<String, String> getLhsVertexProperties() { + return this.lhsProps; + } + + @Override + protected Map<String, String> getRhsVertexProperties() { + return this.rhsProps; + } + + @Override + protected Map<String, String> getValidateVertexProperties() { + return this.validateProps; + } + + @Override + protected String getDagName() { + return "JoinValidate_" + dagNameSuffix; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/359eba29/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java index ae7e7f8..9c149c6 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java @@ -15,11 +15,11 @@ package org.apache.tez.tests; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.Map; +import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -28,13 +28,14 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher; import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService; import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl; import org.apache.tez.examples.HashJoinExample; import org.apache.tez.examples.JoinDataGen; -import org.apache.tez.examples.JoinValidate; +import org.apache.tez.examples.JoinValidateConfigured; import org.apache.tez.service.MiniTezTestServiceCluster; import org.apache.tez.test.MiniTezCluster; import org.junit.AfterClass; @@ -47,23 +48,31 @@ public class TestExternalTezServices { private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush"; - private static MiniTezCluster tezCluster; - private static MiniDFSCluster dfsCluster; - private static MiniTezTestServiceCluster tezTestServiceCluster; + private static volatile MiniTezCluster tezCluster; + private static volatile MiniDFSCluster dfsCluster; + private static volatile MiniTezTestServiceCluster tezTestServiceCluster; - private static Configuration clusterConf = new Configuration(); - private static Configuration confForJobs; + private static volatile Configuration clusterConf = new Configuration(); + private static volatile Configuration confForJobs; - private static FileSystem remoteFs; - private static FileSystem localFs; + private static volatile FileSystem remoteFs; + private static volatile FileSystem localFs; - private static TezClient sharedTezClient; + private static volatile TezClient sharedTezClient; + + private static final Path SRC_DATA_DIR = new Path("/tmp/" + TestExternalTezServices.class.getSimpleName()); + private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = new Path(SRC_DATA_DIR, "expectedOutputPath"); + private static final Path HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, "outPath"); + + private static final Map<String, String> PROPS_EXT_SERVICE_PUSH = Maps.newHashMap(); + private static final Map<String, String> PROPS_REGULAR_CONTAINERS = Maps.newHashMap(); + private static final Map<String, String> PROPS_IN_AM = Maps.newHashMap(); private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestExternalTezServices.class.getName() + "-tmpDir"; @BeforeClass - public static void setup() throws IOException, TezException, InterruptedException { + public static void setup() throws Exception { localFs = FileSystem.getLocal(clusterConf); @@ -108,27 +117,79 @@ public class TestExternalTezServices { remoteFs.mkdirs(stagingDirPath); // This is currently configured to push tasks into the Service, and then use the standard RPC confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); - confForJobs.set(TezConfiguration.TEZ_AM_TASK_SCHEDULERS, + + confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS, + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT, +// TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT, EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskSchedulerService.class.getName()); - confForJobs.set(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS, - EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceNoOpContainerLauncher.class.getName()); - confForJobs.set(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS, - EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskCommunicatorImpl.class.getName()); - confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME); - confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME); - confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME); + confForJobs.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS, + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT, +// TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT, + EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceNoOpContainerLauncher.class.getName()); + confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS, + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT, +// TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT, + EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskCommunicatorImpl.class.getName()); - TezConfiguration tezConf = new TezConfiguration(confForJobs); + // Default all jobs to run via the service. Individual tests override this on a per vertex/dag level. + confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); + confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); + confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); + + // Setup various executor sets + PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); + PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); + PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); + + PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME); + PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME); + PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME); + + PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, + TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT); + PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, + TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT); + PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, + TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT); + + + // Create a session to use for all tests. + TezConfiguration tezClientConf = new TezConfiguration(confForJobs); sharedTezClient = TezClient.create(TestExternalTezServices.class.getSimpleName() + "_session", - tezConf, true); + tezClientConf, true); sharedTezClient.start(); LOG.info("Shared TezSession started"); sharedTezClient.waitTillReady(); LOG.info("Shared TezSession ready for submission"); + // Generate the join data set used for each run. + // Can a timeout be enforced here ? + remoteFs.mkdirs(SRC_DATA_DIR); + Path dataPath1 = new Path(SRC_DATA_DIR, "inPath1"); + Path dataPath2 = new Path(SRC_DATA_DIR, "inPath2"); + TezConfiguration tezConf = new TezConfiguration(confForJobs); + // Generate join data - with 2 tasks. + JoinDataGen dataGen = new JoinDataGen(); + String[] dataGenArgs = new String[]{ + dataPath1.toString(), "1048576", dataPath2.toString(), "524288", + HASH_JOIN_EXPECTED_RESULT_PATH.toString(), "2"}; + assertEquals(0, dataGen.run(tezConf, dataGenArgs, sharedTezClient)); + // Run the actual join - with 2 reducers + HashJoinExample joinExample = new HashJoinExample(); + String[] args = new String[]{ + dataPath1.toString(), dataPath2.toString(), "2", HASH_JOIN_OUTPUT_PATH.toString()}; + assertEquals(0, joinExample.run(tezConf, args, sharedTezClient)); + + LOG.info("Completed generating Data - Expected Hash Result and Actual Join Result"); } @AfterClass @@ -156,35 +217,50 @@ public class TestExternalTezServices { @Test(timeout = 60000) - public void test1() throws Exception { - Path testDir = new Path("/tmp/testHashJoinExample"); + public void testAllInService() throws Exception { + int expectedExternalSubmissions = 4 + 3; //4 for 4 src files, 3 for num reducers. + runJoinValidate("AllInService", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH, + PROPS_EXT_SERVICE_PUSH, PROPS_EXT_SERVICE_PUSH); + } - remoteFs.mkdirs(testDir); + @Test(timeout = 60000) + public void testAllInContainers() throws Exception { + int expectedExternalSubmissions = 0; // All in containers + runJoinValidate("AllInContainers", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS, + PROPS_REGULAR_CONTAINERS, PROPS_REGULAR_CONTAINERS); + } - Path dataPath1 = new Path(testDir, "inPath1"); - Path dataPath2 = new Path(testDir, "inPath2"); - Path expectedOutputPath = new Path(testDir, "expectedOutputPath"); - Path outPath = new Path(testDir, "outPath"); + @Test(timeout = 60000) + public void testMixed1() throws Exception { // M-ExtService, R-containers + int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 3 for num reducers. + runJoinValidate("Mixed1", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH, + PROPS_EXT_SERVICE_PUSH, PROPS_REGULAR_CONTAINERS); + } - TezConfiguration tezConf = new TezConfiguration(confForJobs); + @Test(timeout = 60000) + public void testMixed2() throws Exception { // M-Containers, R-ExtService + int expectedExternalSubmissions = 0 + 3; //4 for 4 src files, 3 for num reducers. + runJoinValidate("Mixed2", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS, + PROPS_REGULAR_CONTAINERS, PROPS_EXT_SERVICE_PUSH); + } - JoinDataGen dataGen = new JoinDataGen(); - String[] dataGenArgs = new String[]{ - dataPath1.toString(), "1048576", dataPath2.toString(), "524288", - expectedOutputPath.toString(), "2"}; - assertEquals(0, dataGen.run(tezConf, dataGenArgs, sharedTezClient)); - HashJoinExample joinExample = new HashJoinExample(); - String[] args = new String[]{ - dataPath1.toString(), dataPath2.toString(), "2", outPath.toString()}; - assertEquals(0, joinExample.run(tezConf, args, sharedTezClient)); + private void runJoinValidate(String name, int extExpectedCount, Map<String, String> lhsProps, + Map<String, String> rhsProps, + Map<String, String> validateProps) throws + Exception { + int externalSubmissionCount = tezTestServiceCluster.getNumSubmissions(); - JoinValidate joinValidate = new JoinValidate(); - String[] validateArgs = new String[]{ - expectedOutputPath.toString(), outPath.toString(), "3"}; + TezConfiguration tezConf = new TezConfiguration(confForJobs); + JoinValidateConfigured joinValidate = + new JoinValidateConfigured(lhsProps, rhsProps, + validateProps, name); + String[] validateArgs = new String[]{"-disableSplitGrouping", + HASH_JOIN_EXPECTED_RESULT_PATH.toString(), HASH_JOIN_OUTPUT_PATH.toString(), "3"}; assertEquals(0, joinValidate.run(tezConf, validateArgs, sharedTezClient)); // Ensure this was actually submitted to the external cluster - assertTrue(tezTestServiceCluster.getNumSubmissions() > 0); + assertEquals(extExpectedCount, + (tezTestServiceCluster.getNumSubmissions() - externalSubmissionCount)); } }
