http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java deleted file mode 100644 index 5159aff..0000000 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ /dev/null @@ -1,426 +0,0 @@ -/* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.dag.app; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; - -import com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.event.Event; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.tez.common.ContainerContext; -import org.apache.tez.common.ContainerTask; -import org.apache.tez.common.TezUtils; -import org.apache.tez.common.security.JobTokenIdentifier; -import org.apache.tez.common.security.JobTokenSecretManager; -import org.apache.tez.common.security.TokenCache; -import org.apache.tez.dag.api.NamedEntityDescriptor; -import org.apache.tez.dag.api.TaskCommunicator; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TezConstants; -import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.api.UserPayload; -import org.apache.tez.serviceplugins.api.ContainerEndReason; -import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; -import org.apache.tez.dag.api.TaskHeartbeatRequest; -import org.apache.tez.dag.api.TaskHeartbeatResponse; -import org.apache.tez.dag.api.TezException; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.tez.common.TezTaskUmbilicalProtocol; -import org.apache.tez.dag.api.TaskCommunicatorContext; -import org.apache.tez.dag.app.dag.DAG; -import org.apache.tez.dag.app.dag.Vertex; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; -import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; -import org.apache.tez.dag.app.dag.event.VertexEventType; -import org.apache.tez.dag.app.rm.container.AMContainer; -import org.apache.tez.dag.app.rm.container.AMContainerMap; -import org.apache.tez.dag.app.rm.container.AMContainerTask; -import org.apache.tez.dag.records.TezDAGID; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.dag.records.TezTaskID; -import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.runtime.api.events.DataMovementEvent; -import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; -import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; -import org.apache.tez.runtime.api.impl.EventType; -import org.apache.tez.runtime.api.impl.TaskSpec; -import org.apache.tez.runtime.api.impl.TezEvent; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentCaptor; - -@SuppressWarnings("unchecked") -// TODO TEZ-2003 (post) TEZ-2696 Rename to TestTezTaskCommunicator -public class TestTaskAttemptListenerImplTezDag { - private ApplicationId appId; - private ApplicationAttemptId appAttemptId; - private AppContext appContext; - Credentials credentials; - AMContainerMap amContainerMap; - EventHandler eventHandler; - DAG dag; - TaskAttemptListenerImpTezDag taskAttemptListener; - ContainerTask containerTask; - AMContainerTask amContainerTask; - TaskSpec taskSpec; - - TezVertexID vertexID; - TezTaskID taskID; - TezTaskAttemptID taskAttemptID; - - @Before - public void setUp() { - appId = ApplicationId.newInstance(1000, 1); - appAttemptId = ApplicationAttemptId.newInstance(appId, 1); - dag = mock(DAG.class); - TezDAGID dagID = TezDAGID.getInstance(appId, 1); - vertexID = TezVertexID.getInstance(dagID, 1); - taskID = TezTaskID.getInstance(vertexID, 1); - taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1); - credentials = new Credentials(); - - amContainerMap = mock(AMContainerMap.class); - Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>(); - - eventHandler = mock(EventHandler.class); - - MockClock clock = new MockClock(); - - appContext = mock(AppContext.class); - doReturn(eventHandler).when(appContext).getEventHandler(); - doReturn(dag).when(appContext).getCurrentDAG(); - doReturn(appAcls).when(appContext).getApplicationACLs(); - doReturn(amContainerMap).when(appContext).getAllContainers(); - doReturn(clock).when(appContext).getClock(); - - doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); - doReturn(credentials).when(appContext).getAppCredentials(); - NodeId nodeId = NodeId.newInstance("localhost", 0); - AMContainer amContainer = mock(AMContainer.class); - Container container = mock(Container.class); - doReturn(nodeId).when(container).getNodeId(); - doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class)); - doReturn(container).when(amContainer).getContainer(); - - Configuration conf = new TezConfiguration(); - UserPayload defaultPayload; - try { - defaultPayload = TezUtils.createUserPayloadFromConf(conf); - } catch (IOException e) { - throw new TezUncheckedException(e); - } - taskAttemptListener = new TaskAttemptListenerImplForTest(appContext, - mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), - Lists.newArrayList( - new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null) - .setUserPayload(defaultPayload))); - - taskSpec = mock(TaskSpec.class); - doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID(); - amContainerTask = new AMContainerTask(taskSpec, null, null, false, 0); - containerTask = null; - } - - @Test(timeout = 5000) - public void testGetTask() throws IOException { - - TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0); - TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical(); - - ContainerId containerId1 = createContainerId(appId, 1); - ContainerContext containerContext1 = new ContainerContext(containerId1.toString()); - containerTask = tezUmbilical.getTask(containerContext1); - assertTrue(containerTask.shouldDie()); - - ContainerId containerId2 = createContainerId(appId, 2); - ContainerContext containerContext2 = new ContainerContext(containerId2.toString()); - taskAttemptListener.registerRunningContainer(containerId2, 0); - containerTask = tezUmbilical.getTask(containerContext2); - assertNull(containerTask); - - // Valid task registered - taskAttemptListener.registerTaskAttempt(amContainerTask, containerId2, 0); - containerTask = tezUmbilical.getTask(containerContext2); - assertFalse(containerTask.shouldDie()); - assertEquals(taskSpec, containerTask.getTaskSpec()); - - // Task unregistered. Should respond to heartbeats - taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0, TaskAttemptEndReason.OTHER, null); - containerTask = tezUmbilical.getTask(containerContext2); - assertNull(containerTask); - - // Container unregistered. Should send a shouldDie = true - taskAttemptListener.unregisterRunningContainer(containerId2, 0, ContainerEndReason.OTHER, null); - containerTask = tezUmbilical.getTask(containerContext2); - assertTrue(containerTask.shouldDie()); - - ContainerId containerId3 = createContainerId(appId, 3); - ContainerContext containerContext3 = new ContainerContext(containerId3.toString()); - taskAttemptListener.registerRunningContainer(containerId3, 0); - - // Register task to container3, followed by unregistering container 3 all together - TaskSpec taskSpec2 = mock(TaskSpec.class); - TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class); - doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID(); - AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false, 0); - taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0); - taskAttemptListener.unregisterRunningContainer(containerId3, 0, ContainerEndReason.OTHER, null); - containerTask = tezUmbilical.getTask(containerContext3); - assertTrue(containerTask.shouldDie()); - } - - @Test(timeout = 5000) - public void testGetTaskMultiplePulls() throws IOException { - TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0); - TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical(); - - ContainerId containerId1 = createContainerId(appId, 1); - - ContainerContext containerContext1 = new ContainerContext(containerId1.toString()); - taskAttemptListener.registerRunningContainer(containerId1, 0); - containerTask = tezUmbilical.getTask(containerContext1); - assertNull(containerTask); - - // Register task - taskAttemptListener.registerTaskAttempt(amContainerTask, containerId1, 0); - containerTask = tezUmbilical.getTask(containerContext1); - assertFalse(containerTask.shouldDie()); - assertEquals(taskSpec, containerTask.getTaskSpec()); - - // Try pulling again - simulates re-use pull - containerTask = tezUmbilical.getTask(containerContext1); - assertNull(containerTask); - } - - @Test (timeout = 5000) - public void testTaskEventRouting() throws Exception { - List<TezEvent> events = Arrays.asList( - new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null), - new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), null), - new TezEvent(new TaskAttemptCompletedEvent(), null) - ); - - generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>()); - - ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); - verify(eventHandler, times(2)).handle(arg.capture()); - final List<Event> argAllValues = arg.getAllValues(); - - final Event statusUpdateEvent = argAllValues.get(0); - assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE, - statusUpdateEvent.getType()); - - final Event vertexEvent = argAllValues.get(1); - final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent; - assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT, - vertexEvent.getType()); - assertEquals(EventType.DATA_MOVEMENT_EVENT, - vertexRouteEvent.getEvents().get(0).getEventType()); - assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT, - vertexRouteEvent.getEvents().get(1).getEventType()); - - } - - @Test (timeout = 5000) - public void testTaskEventRoutingTaskAttemptOnly() throws Exception { - List<TezEvent> events = Arrays.asList( - new TezEvent(new TaskAttemptCompletedEvent(), null) - ); - generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>()); - - ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); - verify(eventHandler, times(1)).handle(arg.capture()); - final List<Event> argAllValues = arg.getAllValues(); - - final Event event = argAllValues.get(0); - assertEquals("only event should be route event", VertexEventType.V_ROUTE_EVENT, - event.getType()); - } - - @Test (timeout = 5000) - public void testTaskHeartbeatResponse() throws Exception { - List<TezEvent> events = new ArrayList<TezEvent>(); - List<TezEvent> eventsToSend = new ArrayList<TezEvent>(); - TaskHeartbeatResponse response = generateHeartbeat(events, 0, 1, 2, eventsToSend); - - assertEquals(2, response.getNextFromEventId()); - assertEquals(eventsToSend, response.getEvents()); - } - - //try 10 times to allocate random port, fail it if no one is succeed. - @Test (timeout = 5000) - public void testPortRange() { - boolean succeedToAllocate = false; - Random rand = new Random(); - for (int i = 0; i < 10; ++i) { - int nextPort = 1024 + rand.nextInt(65535 - 1024); - if (testPortRange(nextPort)) { - succeedToAllocate = true; - break; - } - } - if (!succeedToAllocate) { - fail("Can not allocate free port even in 10 iterations for TaskAttemptListener"); - } - } - - // TODO TEZ-2003 Move this into TestTezTaskCommunicator. Potentially other tests as well. - @Test (timeout= 5000) - public void testPortRange_NotSpecified() throws IOException { - Configuration conf = new Configuration(); - JobTokenIdentifier identifier = new JobTokenIdentifier(new Text( - "fakeIdentifier")); - Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier, - new JobTokenSecretManager()); - sessionToken.setService(identifier.getJobId()); - TokenCache.setSessionToken(sessionToken, credentials); - UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); - taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext, - mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), Lists.newArrayList( - new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null) - .setUserPayload(userPayload))); - // no exception happen, should started properly - taskAttemptListener.init(conf); - taskAttemptListener.start(); - } - - private boolean testPortRange(int port) { - boolean succeedToAllocate = true; - try { - Configuration conf = new Configuration(); - - JobTokenIdentifier identifier = new JobTokenIdentifier(new Text( - "fakeIdentifier")); - Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier, - new JobTokenSecretManager()); - sessionToken.setService(identifier.getJobId()); - TokenCache.setSessionToken(sessionToken, credentials); - - conf.set(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, port + "-" + port); - UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); - - taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext, - mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), Lists - .newArrayList(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null) - .setUserPayload(userPayload))); - taskAttemptListener.init(conf); - taskAttemptListener.start(); - int resultedPort = taskAttemptListener.getTaskCommunicator(0).getAddress().getPort(); - assertEquals(port, resultedPort); - } catch (Exception e) { - succeedToAllocate = false; - } finally { - if (taskAttemptListener != null) { - try { - taskAttemptListener.close(); - } catch (IOException e) { - e.printStackTrace(); - fail("fail to stop TaskAttemptListener"); - } - } - } - return succeedToAllocate; - } - - private TaskHeartbeatResponse generateHeartbeat(List<TezEvent> events, - int fromEventId, int maxEvents, int nextFromEventId, - List<TezEvent> sendEvents) throws IOException, TezException { - ContainerId containerId = createContainerId(appId, 1); - Vertex vertex = mock(Vertex.class); - - doReturn(vertex).when(dag).getVertex(vertexID); - doReturn("test_vertex").when(vertex).getName(); - TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(nextFromEventId, sendEvents, 0); - doReturn(eventInfo).when(vertex).getTaskAttemptTezEvents(taskAttemptID, fromEventId, 0, maxEvents); - - taskAttemptListener.registerRunningContainer(containerId, 0); - taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0); - - TaskHeartbeatRequest request = mock(TaskHeartbeatRequest.class); - doReturn(containerId.toString()).when(request).getContainerIdentifier(); - doReturn(containerId.toString()).when(request).getContainerIdentifier(); - doReturn(taskAttemptID).when(request).getTaskAttemptId(); - doReturn(events).when(request).getEvents(); - doReturn(maxEvents).when(request).getMaxEvents(); - doReturn(fromEventId).when(request).getStartIndex(); - - return taskAttemptListener.heartbeat(request); - } - - - private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) { - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1); - return ContainerId.newInstance(appAttemptId, containerIdx); - } - - private static class TaskAttemptListenerImplForTest extends TaskAttemptListenerImpTezDag { - - public TaskAttemptListenerImplForTest(AppContext context, - TaskHeartbeatHandler thh, - ContainerHeartbeatHandler chh, - List<NamedEntityDescriptor> taskCommDescriptors) { - super(context, thh, chh, taskCommDescriptors); - } - - @Override - TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) { - return new TezTaskCommunicatorImplForTest(taskCommunicatorContext); - } - - } - - private static class TezTaskCommunicatorImplForTest extends TezTaskCommunicatorImpl { - - public TezTaskCommunicatorImplForTest( - TaskCommunicatorContext taskCommunicatorContext) { - super(taskCommunicatorContext); - } - - @Override - protected void startRpcServer() { - } - - @Override - protected void stopRpcServer() { - } - } -}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java deleted file mode 100644 index 74468f2..0000000 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.app; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.event.Event; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.tez.common.TezUtils; -import org.apache.tez.dag.api.NamedEntityDescriptor; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TezConstants; -import org.apache.tez.dag.api.UserPayload; -import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; -import org.apache.tez.dag.app.dag.DAG; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled; -import org.apache.tez.dag.app.rm.container.AMContainer; -import org.apache.tez.dag.app.rm.container.AMContainerMap; -import org.apache.tez.dag.app.rm.container.AMContainerTask; -import org.apache.tez.dag.records.TaskAttemptTerminationCause; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.runtime.api.impl.TaskSpec; -import org.junit.Test; -import org.mockito.ArgumentCaptor; - -// TODO TEZ-2003. Rename to TestTaskAttemptListener | whatever TaskAttemptListener is renamed to. -public class TestTaskAttemptListenerImplTezDag2 { - - @Test(timeout = 5000) - public void testTaskAttemptFailedKilled() throws IOException { - ApplicationId appId = ApplicationId.newInstance(1000, 1); - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); - Credentials credentials = new Credentials(); - AppContext appContext = mock(AppContext.class); - EventHandler eventHandler = mock(EventHandler.class); - DAG dag = mock(DAG.class); - AMContainerMap amContainerMap = mock(AMContainerMap.class); - Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>(); - doReturn(eventHandler).when(appContext).getEventHandler(); - doReturn(dag).when(appContext).getCurrentDAG(); - doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); - doReturn(credentials).when(appContext).getAppCredentials(); - doReturn(appAcls).when(appContext).getApplicationACLs(); - doReturn(amContainerMap).when(appContext).getAllContainers(); - NodeId nodeId = NodeId.newInstance("localhost", 0); - AMContainer amContainer = mock(AMContainer.class); - Container container = mock(Container.class); - doReturn(nodeId).when(container).getNodeId(); - doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class)); - doReturn(container).when(amContainer).getContainer(); - - Configuration conf = new TezConfiguration(); - UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); - TaskAttemptListenerImpTezDag taskAttemptListener = - new TaskAttemptListenerImpTezDag(appContext, mock(TaskHeartbeatHandler.class), - mock(ContainerHeartbeatHandler.class), Lists.newArrayList(new NamedEntityDescriptor( - TezConstants.getTezYarnServicePluginName(), null).setUserPayload(userPayload))); - - TaskSpec taskSpec1 = mock(TaskSpec.class); - TezTaskAttemptID taskAttemptId1 = mock(TezTaskAttemptID.class); - doReturn(taskAttemptId1).when(taskSpec1).getTaskAttemptID(); - AMContainerTask amContainerTask1 = new AMContainerTask(taskSpec1, null, null, false, 10); - - TaskSpec taskSpec2 = mock(TaskSpec.class); - TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class); - doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID(); - AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec2, null, null, false, 10); - - ContainerId containerId1 = createContainerId(appId, 1); - taskAttemptListener.registerRunningContainer(containerId1, 0); - taskAttemptListener.registerTaskAttempt(amContainerTask1, containerId1, 0); - ContainerId containerId2 = createContainerId(appId, 2); - taskAttemptListener.registerRunningContainer(containerId2, 0); - taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId2, 0); - - - taskAttemptListener - .taskFailed(taskAttemptId1, TaskAttemptEndReason.COMMUNICATION_ERROR, "Diagnostics1"); - taskAttemptListener - .taskKilled(taskAttemptId2, TaskAttemptEndReason.EXECUTOR_BUSY, "Diagnostics2"); - - ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class); - verify(eventHandler, times(2)).handle(argumentCaptor.capture()); - assertTrue(argumentCaptor.getAllValues().get(0) instanceof TaskAttemptEventAttemptFailed); - assertTrue(argumentCaptor.getAllValues().get(1) instanceof TaskAttemptEventAttemptKilled); - TaskAttemptEventAttemptFailed failedEvent = - (TaskAttemptEventAttemptFailed) argumentCaptor.getAllValues().get(0); - TaskAttemptEventAttemptKilled killedEvent = - (TaskAttemptEventAttemptKilled) argumentCaptor.getAllValues().get(1); - - assertEquals("Diagnostics1", failedEvent.getDiagnosticInfo()); - assertEquals(TaskAttemptTerminationCause.COMMUNICATION_ERROR, - failedEvent.getTerminationCause()); - - assertEquals("Diagnostics2", killedEvent.getDiagnosticInfo()); - assertEquals(TaskAttemptTerminationCause.SERVICE_BUSY, killedEvent.getTerminationCause()); - // TODO TEZ-2003. Verify unregistration from the registered list - } - - private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) { - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1); - ContainerId containerId = ContainerId.newInstance(appAttemptId, containerIdx); - return containerId; - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java index 1545eb4..5222a2d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java @@ -34,7 +34,7 @@ public class TestTaskCommunicatorContextImpl { @Test(timeout = 5000) public void testIsKnownContainer() { AppContext appContext = mock(AppContext.class); - TaskAttemptListenerImpTezDag tal = mock(TaskAttemptListenerImpTezDag.class); + TaskCommunicatorManager tal = mock(TaskCommunicatorManager.class); AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), tal, mock( ContainerSignatureMatcher.class), appContext); http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java index 4f68fab..be7adde 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java @@ -220,7 +220,7 @@ public class TestTaskCommunicatorManager { } - static class TaskCommManagerForMultipleCommTest extends TaskAttemptListenerImpTezDag { + static class TaskCommManagerForMultipleCommTest extends TaskCommunicatorManager { // All variables setup as static since methods being overridden are invoked by the ContainerLauncherRouter ctor, // and regular variables will not be initialized at this point. http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java new file mode 100644 index 0000000..e8ce429 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java @@ -0,0 +1,425 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.app; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tez.common.ContainerContext; +import org.apache.tez.common.ContainerTask; +import org.apache.tez.common.TezUtils; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.common.security.TokenCache; +import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.dag.api.TaskCommunicator; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; +import org.apache.tez.dag.api.TaskHeartbeatRequest; +import org.apache.tez.dag.api.TaskHeartbeatResponse; +import org.apache.tez.dag.api.TezException; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.tez.common.TezTaskUmbilicalProtocol; +import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; +import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; +import org.apache.tez.dag.app.dag.event.VertexEventType; +import org.apache.tez.dag.app.rm.container.AMContainer; +import org.apache.tez.dag.app.rm.container.AMContainerMap; +import org.apache.tez.dag.app.rm.container.AMContainerTask; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; +import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; +import org.apache.tez.runtime.api.impl.EventType; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +@SuppressWarnings("unchecked") +public class TestTaskCommunicatorManager1 { + private ApplicationId appId; + private ApplicationAttemptId appAttemptId; + private AppContext appContext; + Credentials credentials; + AMContainerMap amContainerMap; + EventHandler eventHandler; + DAG dag; + TaskCommunicatorManager taskAttemptListener; + ContainerTask containerTask; + AMContainerTask amContainerTask; + TaskSpec taskSpec; + + TezVertexID vertexID; + TezTaskID taskID; + TezTaskAttemptID taskAttemptID; + + @Before + public void setUp() { + appId = ApplicationId.newInstance(1000, 1); + appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + dag = mock(DAG.class); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + vertexID = TezVertexID.getInstance(dagID, 1); + taskID = TezTaskID.getInstance(vertexID, 1); + taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1); + credentials = new Credentials(); + + amContainerMap = mock(AMContainerMap.class); + Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>(); + + eventHandler = mock(EventHandler.class); + + MockClock clock = new MockClock(); + + appContext = mock(AppContext.class); + doReturn(eventHandler).when(appContext).getEventHandler(); + doReturn(dag).when(appContext).getCurrentDAG(); + doReturn(appAcls).when(appContext).getApplicationACLs(); + doReturn(amContainerMap).when(appContext).getAllContainers(); + doReturn(clock).when(appContext).getClock(); + + doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); + doReturn(credentials).when(appContext).getAppCredentials(); + NodeId nodeId = NodeId.newInstance("localhost", 0); + AMContainer amContainer = mock(AMContainer.class); + Container container = mock(Container.class); + doReturn(nodeId).when(container).getNodeId(); + doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class)); + doReturn(container).when(amContainer).getContainer(); + + Configuration conf = new TezConfiguration(); + UserPayload defaultPayload; + try { + defaultPayload = TezUtils.createUserPayloadFromConf(conf); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + taskAttemptListener = new TaskCommunicatorManagerInterfaceImplForTest(appContext, + mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), + Lists.newArrayList( + new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null) + .setUserPayload(defaultPayload))); + + taskSpec = mock(TaskSpec.class); + doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID(); + amContainerTask = new AMContainerTask(taskSpec, null, null, false, 0); + containerTask = null; + } + + @Test(timeout = 5000) + public void testGetTask() throws IOException { + + TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0); + TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical(); + + ContainerId containerId1 = createContainerId(appId, 1); + ContainerContext containerContext1 = new ContainerContext(containerId1.toString()); + containerTask = tezUmbilical.getTask(containerContext1); + assertTrue(containerTask.shouldDie()); + + ContainerId containerId2 = createContainerId(appId, 2); + ContainerContext containerContext2 = new ContainerContext(containerId2.toString()); + taskAttemptListener.registerRunningContainer(containerId2, 0); + containerTask = tezUmbilical.getTask(containerContext2); + assertNull(containerTask); + + // Valid task registered + taskAttemptListener.registerTaskAttempt(amContainerTask, containerId2, 0); + containerTask = tezUmbilical.getTask(containerContext2); + assertFalse(containerTask.shouldDie()); + assertEquals(taskSpec, containerTask.getTaskSpec()); + + // Task unregistered. Should respond to heartbeats + taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0, TaskAttemptEndReason.OTHER, null); + containerTask = tezUmbilical.getTask(containerContext2); + assertNull(containerTask); + + // Container unregistered. Should send a shouldDie = true + taskAttemptListener.unregisterRunningContainer(containerId2, 0, ContainerEndReason.OTHER, null); + containerTask = tezUmbilical.getTask(containerContext2); + assertTrue(containerTask.shouldDie()); + + ContainerId containerId3 = createContainerId(appId, 3); + ContainerContext containerContext3 = new ContainerContext(containerId3.toString()); + taskAttemptListener.registerRunningContainer(containerId3, 0); + + // Register task to container3, followed by unregistering container 3 all together + TaskSpec taskSpec2 = mock(TaskSpec.class); + TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class); + doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID(); + AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false, 0); + taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0); + taskAttemptListener.unregisterRunningContainer(containerId3, 0, ContainerEndReason.OTHER, null); + containerTask = tezUmbilical.getTask(containerContext3); + assertTrue(containerTask.shouldDie()); + } + + @Test(timeout = 5000) + public void testGetTaskMultiplePulls() throws IOException { + TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0); + TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical(); + + ContainerId containerId1 = createContainerId(appId, 1); + + ContainerContext containerContext1 = new ContainerContext(containerId1.toString()); + taskAttemptListener.registerRunningContainer(containerId1, 0); + containerTask = tezUmbilical.getTask(containerContext1); + assertNull(containerTask); + + // Register task + taskAttemptListener.registerTaskAttempt(amContainerTask, containerId1, 0); + containerTask = tezUmbilical.getTask(containerContext1); + assertFalse(containerTask.shouldDie()); + assertEquals(taskSpec, containerTask.getTaskSpec()); + + // Try pulling again - simulates re-use pull + containerTask = tezUmbilical.getTask(containerContext1); + assertNull(containerTask); + } + + @Test (timeout = 5000) + public void testTaskEventRouting() throws Exception { + List<TezEvent> events = Arrays.asList( + new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null), + new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), null), + new TezEvent(new TaskAttemptCompletedEvent(), null) + ); + + generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>()); + + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(2)).handle(arg.capture()); + final List<Event> argAllValues = arg.getAllValues(); + + final Event statusUpdateEvent = argAllValues.get(0); + assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE, + statusUpdateEvent.getType()); + + final Event vertexEvent = argAllValues.get(1); + final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent; + assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT, + vertexEvent.getType()); + assertEquals(EventType.DATA_MOVEMENT_EVENT, + vertexRouteEvent.getEvents().get(0).getEventType()); + assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT, + vertexRouteEvent.getEvents().get(1).getEventType()); + + } + + @Test (timeout = 5000) + public void testTaskEventRoutingTaskAttemptOnly() throws Exception { + List<TezEvent> events = Arrays.asList( + new TezEvent(new TaskAttemptCompletedEvent(), null) + ); + generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>()); + + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(1)).handle(arg.capture()); + final List<Event> argAllValues = arg.getAllValues(); + + final Event event = argAllValues.get(0); + assertEquals("only event should be route event", VertexEventType.V_ROUTE_EVENT, + event.getType()); + } + + @Test (timeout = 5000) + public void testTaskHeartbeatResponse() throws Exception { + List<TezEvent> events = new ArrayList<TezEvent>(); + List<TezEvent> eventsToSend = new ArrayList<TezEvent>(); + TaskHeartbeatResponse response = generateHeartbeat(events, 0, 1, 2, eventsToSend); + + assertEquals(2, response.getNextFromEventId()); + assertEquals(eventsToSend, response.getEvents()); + } + + //try 10 times to allocate random port, fail it if no one is succeed. + @Test (timeout = 5000) + public void testPortRange() { + boolean succeedToAllocate = false; + Random rand = new Random(); + for (int i = 0; i < 10; ++i) { + int nextPort = 1024 + rand.nextInt(65535 - 1024); + if (testPortRange(nextPort)) { + succeedToAllocate = true; + break; + } + } + if (!succeedToAllocate) { + fail("Can not allocate free port even in 10 iterations for TaskAttemptListener"); + } + } + + // TODO TEZ-2003 Move this into TestTezTaskCommunicator. Potentially other tests as well. + @Test (timeout= 5000) + public void testPortRange_NotSpecified() throws IOException { + Configuration conf = new Configuration(); + JobTokenIdentifier identifier = new JobTokenIdentifier(new Text( + "fakeIdentifier")); + Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier, + new JobTokenSecretManager()); + sessionToken.setService(identifier.getJobId()); + TokenCache.setSessionToken(sessionToken, credentials); + UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); + taskAttemptListener = new TaskCommunicatorManager(appContext, + mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), Lists.newArrayList( + new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null) + .setUserPayload(userPayload))); + // no exception happen, should started properly + taskAttemptListener.init(conf); + taskAttemptListener.start(); + } + + private boolean testPortRange(int port) { + boolean succeedToAllocate = true; + try { + Configuration conf = new Configuration(); + + JobTokenIdentifier identifier = new JobTokenIdentifier(new Text( + "fakeIdentifier")); + Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier, + new JobTokenSecretManager()); + sessionToken.setService(identifier.getJobId()); + TokenCache.setSessionToken(sessionToken, credentials); + + conf.set(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, port + "-" + port); + UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); + + taskAttemptListener = new TaskCommunicatorManager(appContext, + mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), Lists + .newArrayList(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null) + .setUserPayload(userPayload))); + taskAttemptListener.init(conf); + taskAttemptListener.start(); + int resultedPort = taskAttemptListener.getTaskCommunicator(0).getAddress().getPort(); + assertEquals(port, resultedPort); + } catch (Exception e) { + succeedToAllocate = false; + } finally { + if (taskAttemptListener != null) { + try { + taskAttemptListener.close(); + } catch (IOException e) { + e.printStackTrace(); + fail("fail to stop TaskAttemptListener"); + } + } + } + return succeedToAllocate; + } + + private TaskHeartbeatResponse generateHeartbeat(List<TezEvent> events, + int fromEventId, int maxEvents, int nextFromEventId, + List<TezEvent> sendEvents) throws IOException, TezException { + ContainerId containerId = createContainerId(appId, 1); + Vertex vertex = mock(Vertex.class); + + doReturn(vertex).when(dag).getVertex(vertexID); + doReturn("test_vertex").when(vertex).getName(); + TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(nextFromEventId, sendEvents, 0); + doReturn(eventInfo).when(vertex).getTaskAttemptTezEvents(taskAttemptID, fromEventId, 0, maxEvents); + + taskAttemptListener.registerRunningContainer(containerId, 0); + taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0); + + TaskHeartbeatRequest request = mock(TaskHeartbeatRequest.class); + doReturn(containerId.toString()).when(request).getContainerIdentifier(); + doReturn(containerId.toString()).when(request).getContainerIdentifier(); + doReturn(taskAttemptID).when(request).getTaskAttemptId(); + doReturn(events).when(request).getEvents(); + doReturn(maxEvents).when(request).getMaxEvents(); + doReturn(fromEventId).when(request).getStartIndex(); + + return taskAttemptListener.heartbeat(request); + } + + + private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) { + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1); + return ContainerId.newInstance(appAttemptId, containerIdx); + } + + private static class TaskCommunicatorManagerInterfaceImplForTest extends TaskCommunicatorManager { + + public TaskCommunicatorManagerInterfaceImplForTest(AppContext context, + TaskHeartbeatHandler thh, + ContainerHeartbeatHandler chh, + List<NamedEntityDescriptor> taskCommDescriptors) { + super(context, thh, chh, taskCommDescriptors); + } + + @Override + TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) { + return new TezTaskCommunicatorImplForTest(taskCommunicatorContext); + } + + } + + private static class TezTaskCommunicatorImplForTest extends TezTaskCommunicatorImpl { + + public TezTaskCommunicatorImplForTest( + TaskCommunicatorContext taskCommunicatorContext) { + super(taskCommunicatorContext); + } + + @Override + protected void startRpcServer() { + } + + @Override + protected void stopRpcServer() { + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java new file mode 100644 index 0000000..d75b0e5 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java @@ -0,0 +1,136 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; +import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled; +import org.apache.tez.dag.app.rm.container.AMContainer; +import org.apache.tez.dag.app.rm.container.AMContainerMap; +import org.apache.tez.dag.app.rm.container.AMContainerTask; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +public class TestTaskCommunicatorManager2 { + + @Test(timeout = 5000) + public void testTaskAttemptFailedKilled() throws IOException { + ApplicationId appId = ApplicationId.newInstance(1000, 1); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + Credentials credentials = new Credentials(); + AppContext appContext = mock(AppContext.class); + EventHandler eventHandler = mock(EventHandler.class); + DAG dag = mock(DAG.class); + AMContainerMap amContainerMap = mock(AMContainerMap.class); + Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>(); + doReturn(eventHandler).when(appContext).getEventHandler(); + doReturn(dag).when(appContext).getCurrentDAG(); + doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); + doReturn(credentials).when(appContext).getAppCredentials(); + doReturn(appAcls).when(appContext).getApplicationACLs(); + doReturn(amContainerMap).when(appContext).getAllContainers(); + NodeId nodeId = NodeId.newInstance("localhost", 0); + AMContainer amContainer = mock(AMContainer.class); + Container container = mock(Container.class); + doReturn(nodeId).when(container).getNodeId(); + doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class)); + doReturn(container).when(amContainer).getContainer(); + + Configuration conf = new TezConfiguration(); + UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); + TaskCommunicatorManager taskAttemptListener = + new TaskCommunicatorManager(appContext, mock(TaskHeartbeatHandler.class), + mock(ContainerHeartbeatHandler.class), Lists.newArrayList(new NamedEntityDescriptor( + TezConstants.getTezYarnServicePluginName(), null).setUserPayload(userPayload))); + + TaskSpec taskSpec1 = mock(TaskSpec.class); + TezTaskAttemptID taskAttemptId1 = mock(TezTaskAttemptID.class); + doReturn(taskAttemptId1).when(taskSpec1).getTaskAttemptID(); + AMContainerTask amContainerTask1 = new AMContainerTask(taskSpec1, null, null, false, 10); + + TaskSpec taskSpec2 = mock(TaskSpec.class); + TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class); + doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID(); + AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec2, null, null, false, 10); + + ContainerId containerId1 = createContainerId(appId, 1); + taskAttemptListener.registerRunningContainer(containerId1, 0); + taskAttemptListener.registerTaskAttempt(amContainerTask1, containerId1, 0); + ContainerId containerId2 = createContainerId(appId, 2); + taskAttemptListener.registerRunningContainer(containerId2, 0); + taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId2, 0); + + + taskAttemptListener + .taskFailed(taskAttemptId1, TaskAttemptEndReason.COMMUNICATION_ERROR, "Diagnostics1"); + taskAttemptListener + .taskKilled(taskAttemptId2, TaskAttemptEndReason.EXECUTOR_BUSY, "Diagnostics2"); + + ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(2)).handle(argumentCaptor.capture()); + assertTrue(argumentCaptor.getAllValues().get(0) instanceof TaskAttemptEventAttemptFailed); + assertTrue(argumentCaptor.getAllValues().get(1) instanceof TaskAttemptEventAttemptKilled); + TaskAttemptEventAttemptFailed failedEvent = + (TaskAttemptEventAttemptFailed) argumentCaptor.getAllValues().get(0); + TaskAttemptEventAttemptKilled killedEvent = + (TaskAttemptEventAttemptKilled) argumentCaptor.getAllValues().get(1); + + assertEquals("Diagnostics1", failedEvent.getDiagnosticInfo()); + assertEquals(TaskAttemptTerminationCause.COMMUNICATION_ERROR, + failedEvent.getTerminationCause()); + + assertEquals("Diagnostics2", killedEvent.getDiagnosticInfo()); + assertEquals(TaskAttemptTerminationCause.SERVICE_BUSY, killedEvent.getTerminationCause()); + // TODO TEZ-2003. Verify unregistration from the registered list + } + + private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) { + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1); + ContainerId containerId = ContainerId.newInstance(appAttemptId, containerIdx); + return containerId; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java index 83421a2..f0b89c8 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java @@ -70,7 +70,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ClusterInfo; -import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.app.dag.DAGTerminationCause; @@ -142,7 +142,7 @@ public class TestCommit { private TaskEventDispatcher taskEventDispatcher; private VertexEventDispatcher vertexEventDispatcher; private DagEventDispatcher dagEventDispatcher; - private TaskAttemptListener taskAttemptListener; + private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface; private TaskHeartbeatHandler thh; private Clock clock = new SystemClock(); private DAGFinishEventHandler dagFinishEventHandler; @@ -317,7 +317,7 @@ public class TestCommit { doReturn(historyEventHandler).when(appContext).getHistoryHandler(); doReturn(aclManager).when(appContext).getAMACLManager(); dag = new DAGImpl(dagId, conf, dagPlan, dispatcher.getEventHandler(), - taskAttemptListener, fsTokens, clock, "user", thh, appContext); + taskCommunicatorManagerInterface, fsTokens, clock, "user", thh, appContext); doReturn(dag).when(appContext).getCurrentDAG(); doReturn(dispatcher.getEventHandler()).when(appContext).getEventHandler(); ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10)); http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index e268a99..ac4f61b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -88,7 +88,7 @@ import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ClusterInfo; -import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAGScheduler; import org.apache.tez.dag.app.dag.DAGState; @@ -165,7 +165,7 @@ public class TestDAGImpl { private TaskEventDispatcher taskEventDispatcher; private VertexEventDispatcher vertexEventDispatcher; private DagEventDispatcher dagEventDispatcher; - private TaskAttemptListener taskAttemptListener; + private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface; private TaskHeartbeatHandler thh; private Clock clock = new SystemClock(); private DAGFinishEventHandler dagFinishEventHandler; @@ -784,7 +784,7 @@ public class TestDAGImpl { doReturn(historyEventHandler).when(appContext).getHistoryHandler(); doReturn(aclManager).when(appContext).getAMACLManager(); dag = new DAGImpl(dagId, conf, dagPlan, - dispatcher.getEventHandler(), taskAttemptListener, + dispatcher.getEventHandler(), taskCommunicatorManagerInterface, fsTokens, clock, "user", thh, appContext); dag.entityUpdateTracker = new StateChangeNotifierForTest(dag); doReturn(dag).when(appContext).getCurrentDAG(); @@ -795,7 +795,7 @@ public class TestDAGImpl { mrrDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 2); mrrDagPlan = createTestMRRDAGPlan(); mrrDag = new DAGImpl(mrrDagId, conf, mrrDagPlan, - dispatcher.getEventHandler(), taskAttemptListener, + dispatcher.getEventHandler(), taskCommunicatorManagerInterface, fsTokens, clock, "user", thh, mrrAppContext); mrrDag.entityUpdateTracker = new StateChangeNotifierForTest(mrrDag); @@ -811,7 +811,7 @@ public class TestDAGImpl { groupDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 3); groupDagPlan = createGroupDAGPlan(); groupDag = new DAGImpl(groupDagId, conf, groupDagPlan, - dispatcher.getEventHandler(), taskAttemptListener, + dispatcher.getEventHandler(), taskCommunicatorManagerInterface, fsTokens, clock, "user", thh, groupAppContext); groupDag.entityUpdateTracker = new StateChangeNotifierForTest(groupDag); @@ -881,7 +881,7 @@ public class TestDAGImpl { dagWithCustomEdgeAppContext = mock(AppContext.class); doReturn(aclManager).when(dagWithCustomEdgeAppContext).getAMACLManager(); dagWithCustomEdge = new DAGImpl(dagWithCustomEdgeId, conf, dagPlanWithCustomEdge, - dispatcher.getEventHandler(), taskAttemptListener, + dispatcher.getEventHandler(), taskCommunicatorManagerInterface, fsTokens, clock, "user", thh, dagWithCustomEdgeAppContext); dagWithCustomEdge.entityUpdateTracker = new StateChangeNotifierForTest(dagWithCustomEdge); doReturn(conf).when(dagWithCustomEdgeAppContext).getAMConf(); http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java index 792fa63..409c506 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java @@ -37,7 +37,7 @@ import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ClusterInfo; -import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.app.dag.DAGTerminationCause; @@ -91,7 +91,7 @@ public class TestDAGRecovery { DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan(); dag = new DAGImpl(dagId, new Configuration(), dagPlan, mockEventHandler, - mock(TaskAttemptListener.class), new Credentials(), + mock(TaskCommunicatorManagerInterface.class), new Credentials(), new SystemClock(), user, mock(TaskHeartbeatHandler.class), mockAppContext); } http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 04bb2df..13c9202 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -64,7 +64,7 @@ import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ClusterInfo; import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; -import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; @@ -145,7 +145,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance( TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, - mock(TaskAttemptListener.class), new Configuration(), new SystemClock(), + mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), mock(TaskHeartbeatHandler.class), mock(AppContext.class), false, Resource.newInstance(1024, 1), createFakeContainerContext(), false); @@ -180,12 +180,12 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance( TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, - mock(TaskAttemptListener.class), new Configuration(), new SystemClock(), + mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), mock(TaskHeartbeatHandler.class), mock(AppContext.class), false, Resource.newInstance(1024, 1), createFakeContainerContext(), false); TaskAttemptImpl taImplReScheduled = new MockTaskAttemptImpl(taskID, 1, eventHandler, - mock(TaskAttemptListener.class), new Configuration(), new SystemClock(), + mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), mock(TaskHeartbeatHandler.class), mock(AppContext.class), true, Resource.newInstance(1024, 1), createFakeContainerContext(), false); @@ -243,7 +243,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance( TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, - mock(TaskAttemptListener.class), new Configuration(), + mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), mock(TaskHeartbeatHandler.class), mock(AppContext.class), false, Resource.newInstance(1024, 1), createFakeContainerContext(), false); @@ -285,7 +285,7 @@ public class TestTaskAttempt { TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0); MockEventHandler eventHandler = new MockEventHandler(); - TaskAttemptListener taListener = createMockTaskAttemptListener(); + TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -334,7 +334,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); MockEventHandler eventHandler = spy(new MockEventHandler()); - TaskAttemptListener taListener = createMockTaskAttemptListener(); + TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -354,7 +354,7 @@ public class TestTaskAttempt { AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( - mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); containers.addContainerIfNew(container, 0, 0, 0); @@ -434,7 +434,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); MockEventHandler eventHandler = new MockEventHandler(); - TaskAttemptListener taListener = createMockTaskAttemptListener(); + TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -454,7 +454,7 @@ public class TestTaskAttempt { AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( - mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); containers.addContainerIfNew(container, 0, 0, 0); @@ -498,7 +498,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); MockEventHandler eventHandler = spy(new MockEventHandler()); - TaskAttemptListener taListener = createMockTaskAttemptListener(); + TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -518,7 +518,7 @@ public class TestTaskAttempt { AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( - mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); containers.addContainerIfNew(container, 0, 0, 0); @@ -589,7 +589,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); MockEventHandler eventHandler = spy(new MockEventHandler()); - TaskAttemptListener taListener = createMockTaskAttemptListener(); + TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -610,7 +610,7 @@ public class TestTaskAttempt { AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( - mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); containers.addContainerIfNew(container, 0, 0, 0); @@ -719,7 +719,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); MockEventHandler eventHandler = spy(new MockEventHandler()); - TaskAttemptListener taListener = createMockTaskAttemptListener(); + TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -740,7 +740,7 @@ public class TestTaskAttempt { AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( - mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); containers.addContainerIfNew(container, 0, 0, 0); @@ -810,7 +810,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); MockEventHandler eventHandler = spy(new MockEventHandler()); - TaskAttemptListener taListener = createMockTaskAttemptListener(); + TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -830,7 +830,7 @@ public class TestTaskAttempt { AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( - mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); containers.addContainerIfNew(container, 0, 0, 0); @@ -904,7 +904,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); MockEventHandler eventHandler = spy(new MockEventHandler()); - TaskAttemptListener taListener = createMockTaskAttemptListener(); + TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -924,7 +924,7 @@ public class TestTaskAttempt { AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( - mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); containers.addContainerIfNew(container, 0, 0, 0); @@ -1006,7 +1006,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); MockEventHandler eventHandler = spy(new MockEventHandler()); - TaskAttemptListener taListener = createMockTaskAttemptListener(); + TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -1026,7 +1026,7 @@ public class TestTaskAttempt { AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( - mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); containers.addContainerIfNew(container, 0, 0, 0); @@ -1105,7 +1105,7 @@ public class TestTaskAttempt { MockEventHandler mockEh = new MockEventHandler(); MockEventHandler eventHandler = spy(mockEh); - TaskAttemptListener taListener = createMockTaskAttemptListener(); + TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -1125,7 +1125,7 @@ public class TestTaskAttempt { AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( - mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); containers.addContainerIfNew(container, 0, 0, 0); @@ -1249,7 +1249,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); MockEventHandler eventHandler = spy(new MockEventHandler()); - TaskAttemptListener taListener = createMockTaskAttemptListener(); + TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -1268,7 +1268,7 @@ public class TestTaskAttempt { AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( - mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); containers.addContainerIfNew(container, 0, 0, 0); @@ -1324,7 +1324,7 @@ public class TestTaskAttempt { public int taskAttemptStartedEventLogged = 0; public int taskAttemptFinishedEventLogged = 0; public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber, - EventHandler eventHandler, TaskAttemptListener tal, + EventHandler eventHandler, TaskCommunicatorManagerInterface tal, Configuration conf, Clock clock, TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, boolean isRescheduled, @@ -1378,8 +1378,8 @@ public class TestTaskAttempt { new Credentials(), new HashMap<String, String>(), ""); } - private TaskAttemptListener createMockTaskAttemptListener() { - TaskAttemptListener taListener = mock(TaskAttemptListener.class); + private TaskCommunicatorManagerInterface createMockTaskAttemptListener() { + TaskCommunicatorManagerInterface taListener = mock(TaskCommunicatorManagerInterface.class); TaskCommunicator taskComm = mock(TaskCommunicator.class); doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress(); doReturn(taskComm).when(taListener).getTaskCommunicator(0); http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java index 4a797e0..6bbfc3d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java @@ -42,7 +42,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; -import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; @@ -146,7 +146,7 @@ public class TestTaskAttemptRecovery { TezTaskID.fromString("task_1407371892933_0001_1_00_000000"); ta = new TaskAttemptImpl(taskId, 0, mockEventHandler, - mock(TaskAttemptListener.class), new Configuration(), + mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), mock(TaskHeartbeatHandler.class), mockAppContext, false, Resource.newInstance(1, 1), mock(ContainerContext.class), false, mockTask); http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index 807f277..24c9664 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -50,7 +50,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; -import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.app.dag.TaskStateInternal; @@ -85,7 +85,7 @@ public class TestTaskImpl { private final int partition = 1; private Configuration conf; - private TaskAttemptListener taskAttemptListener; + private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface; private TaskHeartbeatHandler taskHeartbeatHandler; private Credentials credentials; private Clock clock; @@ -122,7 +122,7 @@ public class TestTaskImpl { @Before public void setup() { conf = new Configuration(); - taskAttemptListener = mock(TaskAttemptListener.class); + taskCommunicatorManagerInterface = mock(TaskCommunicatorManagerInterface.class); taskHeartbeatHandler = mock(TaskHeartbeatHandler.class); credentials = new Credentials(); clock = new SystemClock(); @@ -151,7 +151,7 @@ public class TestTaskImpl { eventHandler = new TestEventHandler(); mockTask = new MockTaskImpl(vertexId, partition, - eventHandler, conf, taskAttemptListener, clock, + eventHandler, conf, taskCommunicatorManagerInterface, clock, taskHeartbeatHandler, appContext, leafVertex, taskResource, containerContext, vertex); mockTaskSpec = mock(TaskSpec.class); @@ -698,11 +698,11 @@ public class TestTaskImpl { public MockTaskImpl(TezVertexID vertexId, int partition, EventHandler eventHandler, Configuration conf, - TaskAttemptListener taskAttemptListener, Clock clock, + TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Clock clock, TaskHeartbeatHandler thh, AppContext appContext, boolean leafVertex, Resource resource, ContainerContext containerContext, Vertex vertex) { - super(vertexId, partition, eventHandler, conf, taskAttemptListener, + super(vertexId, partition, eventHandler, conf, taskCommunicatorManagerInterface, clock, thh, appContext, leafVertex, resource, containerContext, mock(StateChangeNotifier.class), vertex); this.vertex = vertex; @@ -711,7 +711,7 @@ public class TestTaskImpl { @Override protected TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedCausalTA) { MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(), - attemptNumber, eventHandler, taskAttemptListener, + attemptNumber, eventHandler, taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext, true, taskResource, containerContext, schedCausalTA); taskAttempts.add(attempt); @@ -757,7 +757,7 @@ public class TestTaskImpl { private TaskAttemptState state = TaskAttemptState.NEW; public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber, - EventHandler eventHandler, TaskAttemptListener tal, Configuration conf, + EventHandler eventHandler, TaskCommunicatorManagerInterface tal, Configuration conf, Clock clock, TaskHeartbeatHandler thh, AppContext appContext, boolean isRescheduled, Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) { http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java index 1d22e06..eca8274 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java @@ -46,7 +46,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; -import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; @@ -189,7 +189,7 @@ public class TestTaskRecovery { when(mockAppContext.getHistoryHandler()).thenReturn(mockHistoryEventHandler); task = new TaskImpl(vertexId, 0, dispatcher.getEventHandler(), - new Configuration(), mock(TaskAttemptListener.class), + new Configuration(), mock(TaskCommunicatorManagerInterface.class), new SystemClock(), mock(TaskHeartbeatHandler.class), mockAppContext, false, Resource.newInstance(1, 1), mock(ContainerContext.class), mock(StateChangeNotifier.class), vertex);
