http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java index 07dfcd6..25fd13e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java @@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.tez.dag.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; public abstract class TaskSchedulerService extends AbstractService{
http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index 6f897e1..d4cf317 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.resource.Resources; -import org.apache.tez.dag.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java index 8ef2a83..cecb019 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java @@ -20,7 +20,7 @@ package org.apache.tez.dag.app.rm.container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.tez.dag.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.dag.records.TaskAttemptTerminationCause; public class AMContainerEventCompleted extends AMContainerEvent { http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index e9e0f04..e63d86d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -28,8 +28,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.tez.common.TezUtilsInternal; -import org.apache.tez.dag.api.ContainerEndReason; -import org.apache.tez.dag.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.security.Credentials; @@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; @@ -1068,12 +1067,12 @@ public class AMContainerImpl implements AMContainer { } protected void sendStartRequestToNM(ContainerLaunchContext clc) { - sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container, launcherId, taskCommId)); + sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container, launcherId, schedulerId, taskCommId)); } protected void sendStopRequestToNM() { sendEvent(new NMCommunicatorStopRequestEvent(containerId, - container.getNodeId(), container.getContainerToken(), launcherId)); + container.getNodeId(), container.getContainerToken(), launcherId, schedulerId, taskCommId)); } protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId, TaskAttemptEndReason endReason) { http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index 0f35bba..3c3c6a7 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -34,11 +34,14 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; +import org.apache.tez.serviceplugins.api.ContainerLauncher; +import org.apache.tez.serviceplugins.api.ContainerLauncherContext; +import org.apache.tez.serviceplugins.api.ContainerStopRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -53,23 +56,14 @@ import org.apache.tez.dag.api.TaskHeartbeatRequest; import org.apache.tez.dag.api.TaskHeartbeatResponse; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.app.launcher.ContainerLauncher; import org.apache.tez.dag.app.launcher.ContainerLauncherRouter; import org.apache.tez.dag.app.rm.NMCommunicatorEvent; -import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; -import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent; -import org.apache.tez.dag.app.rm.container.AMContainerEvent; -import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched; -import org.apache.tez.dag.app.rm.container.AMContainerEventType; import org.apache.tez.dag.history.HistoryEventHandler; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; -import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; -import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventMetaData; -import org.apache.tez.runtime.api.impl.OutputSpec; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TaskStatistics; import org.apache.tez.runtime.api.impl.TezEvent; @@ -89,6 +83,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; public class MockDAGAppMaster extends DAGAppMaster { private static final Logger LOG = LoggerFactory.getLogger(MockDAGAppMaster.class); + ContainerLauncherContext containerLauncherContext; MockContainerLauncher containerLauncher; boolean initFailFlag; boolean startFailFlag; @@ -121,7 +116,7 @@ public class MockDAGAppMaster extends DAGAppMaster { // Upon, launch of a container is simulates the container asking for tasks // Upon receiving a task it simulates completion of the tasks // It can be used to preempt the container for a given task - public class MockContainerLauncher extends AbstractService implements ContainerLauncher, Runnable { + public class MockContainerLauncher extends ContainerLauncher implements Runnable { BlockingQueue<NMCommunicatorEvent> eventQueue = new LinkedBlockingQueue<NMCommunicatorEvent>(); Thread eventHandlingThread; @@ -141,12 +136,14 @@ public class MockDAGAppMaster extends DAGAppMaster { Map<TezTaskID, Integer> preemptedTasks = Maps.newConcurrentMap(); Map<TezTaskAttemptID, Integer> tasksWithStatusUpdates = Maps.newConcurrentMap(); - - public MockContainerLauncher(AtomicBoolean goFlag) { - super("MockContainerLauncher"); + + public MockContainerLauncher(AtomicBoolean goFlag, + ContainerLauncherContext containerLauncherContext) { + super("MockContainerLauncher", containerLauncherContext); this.goFlag = goFlag; } + public class ContainerData { ContainerId cId; TezTaskAttemptID taId; @@ -211,20 +208,18 @@ public class MockDAGAppMaster extends DAGAppMaster { executorService.shutdownNow(); } } - + + @Override - public void handle(NMCommunicatorEvent event) { - switch (event.getType()) { - case CONTAINER_LAUNCH_REQUEST: - launch((NMCommunicatorLaunchRequestEvent) event); - break; - case CONTAINER_STOP_REQUEST: - stop((NMCommunicatorStopRequestEvent)event); - break; - } + public void launchContainer(ContainerLaunchRequest launchRequest) { + launch(launchRequest); } - - + + @Override + public void stopContainer(ContainerStopRequest stopRequest) { + stop(stopRequest); + } + void waitToGo() { if (goFlag == null) { return; @@ -266,20 +261,19 @@ public class MockDAGAppMaster extends DAGAppMaster { tasksWithStatusUpdates.put(tId, numUpdates); } - void stop(NMCommunicatorStopRequestEvent event) { + void stop(ContainerStopRequest event) { // remove from simulated container list containers.remove(event.getContainerId()); - getContext().getEventHandler().handle( - new AMContainerEvent(event.getContainerId(), AMContainerEventType.C_NM_STOP_SENT)); + getContext().containerStopRequested(event.getContainerId()); } - void launch(NMCommunicatorLaunchRequestEvent event) { + void launch(ContainerLaunchRequest event) { // launch container by putting it in simulated container list ContainerData cData = new ContainerData(event.getContainerId(), event.getContainerLaunchContext()); containers.put(event.getContainerId(), cData); containersToProcess.add(cData); - getContext().getEventHandler().handle(new AMContainerEventLaunched(event.getContainerId())); + getContext().containerLaunched(event.getContainerId()); } public void waitTillContainersLaunched() throws InterruptedException { @@ -289,7 +283,7 @@ public class MockDAGAppMaster extends DAGAppMaster { } void incrementTime(long inc) { - Clock clock = getContext().getClock(); + Clock clock = MockDAGAppMaster.this.getContext().getClock(); if (clock instanceof MockClock) { ((MockClock) clock).incrementTime(inc); } @@ -493,7 +487,8 @@ public class MockDAGAppMaster extends DAGAppMaster { super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime, isSession, workingDirectory, localDirs, logDirs, new TezApiVersionInfo().getVersion(), 1, credentials, jobUserName); - containerLauncher = new MockContainerLauncher(launcherGoFlag); + containerLauncherContext = new ContainerLauncherContextImpl(getContext(), getTaskAttemptListener()); + containerLauncher = new MockContainerLauncher(launcherGoFlag, containerLauncherContext); shutdownHandler = new MockDAGAppMasterShutdownHandler(); this.initFailFlag = initFailFlag; this.startFailFlag = startFailFlag; @@ -508,7 +503,7 @@ public class MockDAGAppMaster extends DAGAppMaster { String[] containerLaunchers, boolean isLocal) throws UnknownHostException { - return new ContainerLauncherRouter(containerLauncher); + return new ContainerLauncherRouter(containerLauncher, getContext()); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java index 7f0362d..df643e4 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java @@ -51,8 +51,8 @@ import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.ContainerEndReason; -import org.apache.tez.dag.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskHeartbeatRequest; import org.apache.tez.dag.api.TaskHeartbeatResponse; import org.apache.tez.dag.api.TezException; http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java index 934543f..8d776fb 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java @@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.tez.dag.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled; http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index 62edac9..e37ab4a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -53,12 +53,11 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.util.RackResolver; import org.apache.tez.common.MockDNSToSwitchMapping; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; -import org.apache.tez.dag.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; @@ -94,7 +93,6 @@ import org.apache.tez.runtime.api.impl.OutputSpec; import org.apache.tez.runtime.api.impl.TaskSpec; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.internal.matchers.Null; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java index 0a642bb..b555c62 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java @@ -27,7 +27,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback; http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index b8b4774..7bcb6d2 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -63,8 +63,8 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; -import org.apache.tez.dag.api.ContainerEndReason; -import org.apache.tez.dag.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java index 9d22196..dbf5054 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java @@ -14,34 +14,30 @@ package org.apache.tez.dag.app.launcher; +import java.io.IOException; import java.net.InetSocketAddress; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.tez.dag.app.AppContext; -import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; +import org.apache.tez.serviceplugins.api.ContainerLauncher; +import org.apache.tez.serviceplugins.api.ContainerLauncherContext; +import org.apache.tez.serviceplugins.api.ContainerStopRequest; +import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.app.TezTestServiceCommunicator; -import org.apache.tez.dag.app.rm.NMCommunicatorEvent; -import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; -import org.apache.tez.dag.app.rm.container.AMContainerEvent; -import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed; -import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched; -import org.apache.tez.dag.app.rm.container.AMContainerEventType; -import org.apache.tez.dag.history.DAGHistoryEvent; -import org.apache.tez.dag.history.events.ContainerLaunchedEvent; import org.apache.tez.service.TezTestServiceConfConstants; import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos; import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TezTestServiceContainerLauncher extends AbstractService implements ContainerLauncher { +// TODO TEZ-2003 look for all LOG.*(DBG and LOG.*(DEBUG messages + +public class TezTestServiceContainerLauncher extends ContainerLauncher { // TODO Support interruptability of tasks which haven't yet been launched. @@ -49,40 +45,32 @@ public class TezTestServiceContainerLauncher extends AbstractService implements static final Logger LOG = LoggerFactory.getLogger(TezTestServiceContainerLauncher.class); - private final AppContext context; private final String tokenIdentifier; - private final TaskAttemptListener tal; private final int servicePort; private final TezTestServiceCommunicator communicator; - private final Clock clock; private final ApplicationAttemptId appAttemptId; + // private final TaskAttemptListener tal; // Configuration passed in here to set up final parameters - public TezTestServiceContainerLauncher(AppContext appContext, Configuration conf, - TaskAttemptListener tal) { - super(TezTestServiceContainerLauncher.class.getName()); - this.clock = appContext.getClock(); - int numThreads = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS, + public TezTestServiceContainerLauncher(ContainerLauncherContext containerLauncherContext) { + super(TezTestServiceContainerLauncher.class.getName(), containerLauncherContext); + int numThreads = getContext().getInitialConfiguration().getInt( + TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS, TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT); - this.servicePort = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1); + this.servicePort = getContext().getInitialConfiguration().getInt( + TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1); Preconditions.checkArgument(servicePort > 0, TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be set"); this.communicator = new TezTestServiceCommunicator(numThreads); - this.context = appContext; - this.tokenIdentifier = context.getApplicationID().toString(); - this.appAttemptId = appContext.getApplicationAttemptId(); - this.tal = tal; - } - - @Override - public void serviceInit(Configuration conf) { - communicator.init(conf); + this.tokenIdentifier = getContext().getApplicationAttemptId().getApplicationId().toString(); + this.appAttemptId = getContext().getApplicationAttemptId(); } @Override public void serviceStart() { + communicator.init(getContext().getInitialConfiguration()); communicator.start(); } @@ -92,51 +80,56 @@ public class TezTestServiceContainerLauncher extends AbstractService implements } @Override - public void handle(NMCommunicatorEvent event) { - switch (event.getType()) { - case CONTAINER_LAUNCH_REQUEST: - final NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event; - RunContainerRequestProto runRequest = constructRunContainerRequest(launchEvent); - communicator.runContainer(runRequest, launchEvent.getNodeId().getHost(), - launchEvent.getNodeId().getPort(), - new TezTestServiceCommunicator.ExecuteRequestCallback<TezTestServiceProtocolProtos.RunContainerResponseProto>() { - @Override - public void setResponse(TezTestServiceProtocolProtos.RunContainerResponseProto response) { - LOG.info("Container: " + launchEvent.getContainerId() + " launch succeeded on host: " + launchEvent.getNodeId()); - context.getEventHandler().handle(new AMContainerEventLaunched(launchEvent.getContainerId())); - ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent( - launchEvent.getContainerId(), clock.getTime(), context.getApplicationAttemptId()); - context.getHistoryHandler().handle(new DAGHistoryEvent( - null, lEvt)); - } - - @Override - public void indicateError(Throwable t) { - LOG.error("Failed to launch container: " + launchEvent.getContainer() + " on host: " + launchEvent.getNodeId(), t); - sendContainerLaunchFailedMsg(launchEvent.getContainerId(), t); - } - }); - break; - case CONTAINER_STOP_REQUEST: - LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + event); - // that the container is actually done (normally received from RM) - // TODO Sending this out for an un-launched container is invalid - context.getEventHandler().handle(new AMContainerEvent(event.getContainerId(), - AMContainerEventType.C_NM_STOP_SENT)); - break; + public void launchContainer(final ContainerLaunchRequest launchRequest) { + RunContainerRequestProto runRequest = null; + try { + runRequest = constructRunContainerRequest(launchRequest); + } catch (IOException e) { + getContext().containerLaunchFailed(launchRequest.getContainerId(), + "Failed to construct launch request, " + StringUtils.stringifyException(e)); + return; } + communicator.runContainer(runRequest, launchRequest.getNodeId().getHost(), + launchRequest.getNodeId().getPort(), + new TezTestServiceCommunicator.ExecuteRequestCallback<TezTestServiceProtocolProtos.RunContainerResponseProto>() { + @Override + public void setResponse(TezTestServiceProtocolProtos.RunContainerResponseProto response) { + LOG.info( + "Container: " + launchRequest.getContainerId() + " launch succeeded on host: " + + launchRequest.getNodeId()); + getContext().containerLaunched(launchRequest.getContainerId()); + } + + @Override + public void indicateError(Throwable t) { + LOG.error( + "Failed to launch container: " + launchRequest.getContainerId() + " on host: " + + launchRequest.getNodeId(), t); + sendContainerLaunchFailedMsg(launchRequest.getContainerId(), t); + } + }); } - private RunContainerRequestProto constructRunContainerRequest(NMCommunicatorLaunchRequestEvent event) { + @Override + public void stopContainer(ContainerStopRequest stopRequest) { + LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + stopRequest); + // that the container is actually done (normally received from RM) + // TODO Sending this out for an un-launched container is invalid + getContext().containerStopRequested(stopRequest.getContainerId()); + } + + private RunContainerRequestProto constructRunContainerRequest(ContainerLaunchRequest launchRequest) throws + IOException { RunContainerRequestProto.Builder builder = RunContainerRequestProto.newBuilder(); - InetSocketAddress address = tal.getTaskCommunicator(event.getTaskCommId()).getAddress(); + Preconditions.checkArgument(launchRequest.getTaskCommunicatorName().equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)); + InetSocketAddress address = (InetSocketAddress) getContext().getTaskCommunicatorMetaInfo(launchRequest.getTaskCommunicatorName()); builder.setAmHost(address.getHostName()).setAmPort(address.getPort()); builder.setAppAttemptNumber(appAttemptId.getAttemptId()); builder.setApplicationIdString(appAttemptId.getApplicationId().toString()); builder.setTokenIdentifier(tokenIdentifier); - builder.setContainerIdString(event.getContainer().getId().toString()); + builder.setContainerIdString(launchRequest.getContainerId().toString()); builder.setCredentialsBinary( - ByteString.copyFrom(event.getContainerLaunchContext().getTokens())); + ByteString.copyFrom(launchRequest.getContainerLaunchContext().getTokens())); // TODO Avoid reading this from the environment builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name())); return builder.build(); @@ -144,6 +137,8 @@ public class TezTestServiceContainerLauncher extends AbstractService implements @SuppressWarnings("unchecked") void sendContainerLaunchFailedMsg(ContainerId containerId, Throwable t) { - context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, t == null ? "" : t.getMessage())); + getContext().containerLaunchFailed(containerId, t == null ? "" : t.getMessage()); } + + } http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java index 977d0d3..d3743e1 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java @@ -14,53 +14,32 @@ package org.apache.tez.dag.app.launcher; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.tez.dag.app.AppContext; -import org.apache.tez.dag.app.TaskAttemptListener; -import org.apache.tez.dag.app.rm.NMCommunicatorEvent; -import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; -import org.apache.tez.dag.app.rm.container.AMContainerEvent; -import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched; -import org.apache.tez.dag.app.rm.container.AMContainerEventType; -import org.apache.tez.dag.history.DAGHistoryEvent; -import org.apache.tez.dag.history.events.ContainerLaunchedEvent; +import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; +import org.apache.tez.serviceplugins.api.ContainerLauncher; +import org.apache.tez.serviceplugins.api.ContainerLauncherContext; +import org.apache.tez.serviceplugins.api.ContainerStopRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TezTestServiceNoOpContainerLauncher extends AbstractService implements ContainerLauncher { +public class TezTestServiceNoOpContainerLauncher extends ContainerLauncher { static final Logger LOG = LoggerFactory.getLogger(TezTestServiceNoOpContainerLauncher.class); - private final AppContext context; - private final Clock clock; - public TezTestServiceNoOpContainerLauncher(AppContext appContext, Configuration conf, - TaskAttemptListener tal) { - super(TezTestServiceNoOpContainerLauncher.class.getName()); - this.context = appContext; - this.clock = appContext.getClock(); + public TezTestServiceNoOpContainerLauncher(ContainerLauncherContext containerLauncherContext) { + super(TezTestServiceNoOpContainerLauncher.class.getName(), containerLauncherContext); } @Override - public void handle(NMCommunicatorEvent event) { - switch(event.getType()) { - case CONTAINER_LAUNCH_REQUEST: - final NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event; - LOG.info("No-op launch for container: " + launchEvent.getContainerId() + " succeeded on host: " + launchEvent.getNodeId()); - context.getEventHandler().handle(new AMContainerEventLaunched(launchEvent.getContainerId())); - ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent( - launchEvent.getContainerId(), clock.getTime(), context.getApplicationAttemptId()); - context.getHistoryHandler().handle(new DAGHistoryEvent( - null, lEvt)); - break; - case CONTAINER_STOP_REQUEST: - LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + event); - context.getEventHandler().handle(new AMContainerEvent(event.getContainerId(), - AMContainerEventType.C_NM_STOP_SENT)); - break; - } + public void launchContainer(ContainerLaunchRequest launchRequest) { + LOG.info("No-op launch for container {} succeeded on host: {}", launchRequest.getContainerId(), + launchRequest.getNodeId()); + getContext().containerLaunched(launchRequest.getContainerId()); + } + @Override + public void stopContainer(ContainerStopRequest stopRequest) { + LOG.info("DEBUG: Ignoring STOP_REQUEST {}", stopRequest); + getContext().containerStopRequested(stopRequest.getContainerId()); } } http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java index 073cb50..506e991 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java @@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.tez.dag.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.app.AppContext; import org.apache.tez.service.TezTestServiceConfConstants; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java index 98673a6..444498e 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java @@ -30,8 +30,8 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.tez.dag.api.ContainerEndReason; -import org.apache.tez.dag.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.app.TezTaskCommunicatorImpl; import org.apache.tez.dag.app.TezTestServiceCommunicator;
