http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java deleted file mode 100644 index 07d269d..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java +++ /dev/null @@ -1,410 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.dag.app.launcher; - -import java.io.IOException; -import java.util.Collections; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.lang.exception.ExceptionUtils; -import org.apache.tez.common.TezUtils; -import org.apache.tez.dag.api.TezConstants; -import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; -import org.apache.tez.serviceplugins.api.ContainerLauncher; -import org.apache.tez.serviceplugins.api.ContainerLauncherContext; -import org.apache.tez.serviceplugins.api.ContainerStopRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy; -import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData; -import org.apache.hadoop.yarn.util.Records; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TezUncheckedException; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; - - -// TODO See what part of this lifecycle and state management can be simplified. -// Ideally, no state - only sendStart / sendStop. - -// TODO Review this entire code and clean it up. - -/** - * This class is responsible for launching of containers. - */ -public class ContainerLauncherImpl extends ContainerLauncher { - - // TODO Ensure the same thread is used to launch / stop the same container. Or - ensure event ordering. - static final Logger LOG = LoggerFactory.getLogger(ContainerLauncherImpl.class); - - private final ConcurrentHashMap<ContainerId, Container> containers = - new ConcurrentHashMap<>(); - protected ThreadPoolExecutor launcherPool; - protected static final int INITIAL_POOL_SIZE = 10; - private final int limitOnPoolSize; - private final Configuration conf; - private Thread eventHandlingThread; - protected BlockingQueue<ContainerOp> eventQueue = new LinkedBlockingQueue<>(); - private ContainerManagementProtocolProxy cmProxy; - private AtomicBoolean serviceStopped = new AtomicBoolean(false); - - private Container getContainer(ContainerOp event) { - ContainerId id = event.getBaseOperation().getContainerId(); - Container c = containers.get(id); - if(c == null) { - c = new Container(id, - event.getBaseOperation().getNodeId().toString(), event.getBaseOperation().getContainerToken()); - Container old = containers.putIfAbsent(id, c); - if(old != null) { - c = old; - } - } - return c; - } - - private void removeContainerIfDone(ContainerId id) { - Container c = containers.get(id); - if(c != null && c.isCompletelyDone()) { - containers.remove(id); - } - } - - - private static enum ContainerState { - PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH - } - - private class Container { - private ContainerState state; - // store enough information to be able to cleanup the container - private ContainerId containerID; - final private String containerMgrAddress; - private Token containerToken; - - public Container(ContainerId containerID, - String containerMgrAddress, Token containerToken) { - this.state = ContainerState.PREP; - this.containerMgrAddress = containerMgrAddress; - this.containerID = containerID; - this.containerToken = containerToken; - } - - public synchronized boolean isCompletelyDone() { - return state == ContainerState.DONE || state == ContainerState.FAILED; - } - - @SuppressWarnings("unchecked") - public synchronized void launch(ContainerLaunchRequest event) { - LOG.info("Launching Container with Id: " + event.getContainerId()); - if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) { - state = ContainerState.DONE; - sendContainerLaunchFailedMsg(event.getContainerId(), - "Container was killed before it was launched"); - return; - } - - ContainerManagementProtocolProxyData proxy = null; - try { - - proxy = getCMProxy(containerID, containerMgrAddress, - containerToken); - - // Construct the actual Container - ContainerLaunchContext containerLaunchContext = - event.getContainerLaunchContext(); - - // Now launch the actual container - StartContainerRequest startRequest = Records - .newRecord(StartContainerRequest.class); - startRequest.setContainerToken(event.getContainerToken()); - startRequest.setContainerLaunchContext(containerLaunchContext); - - StartContainersResponse response = - proxy.getContainerManagementProtocol().startContainers( - StartContainersRequest.newInstance( - Collections.singletonList(startRequest))); - if (response.getFailedRequests() != null - && !response.getFailedRequests().isEmpty()) { - throw response.getFailedRequests().get(containerID).deSerialize(); - } - - // after launching, send launched event to task attempt to move - // it from ASSIGNED to RUNNING state - getContext().containerLaunched(containerID); - this.state = ContainerState.RUNNING; - } catch (Throwable t) { - String message = "Container launch failed for " + containerID + " : " - + ExceptionUtils.getStackTrace(t); - this.state = ContainerState.FAILED; - sendContainerLaunchFailedMsg(containerID, message); - } finally { - if (proxy != null) { - cmProxy.mayBeCloseProxy(proxy); - } - } - } - - @SuppressWarnings("unchecked") - public synchronized void kill() { - - if(isCompletelyDone()) { - return; - } - if(this.state == ContainerState.PREP) { - this.state = ContainerState.KILLED_BEFORE_LAUNCH; - } else { - LOG.info("Sending a stop request to the NM for ContainerId: " - + containerID); - - ContainerManagementProtocolProxyData proxy = null; - try { - proxy = getCMProxy(this.containerID, this.containerMgrAddress, - this.containerToken); - - // kill the remote container if already launched - StopContainersRequest stopRequest = Records - .newRecord(StopContainersRequest.class); - stopRequest.setContainerIds(Collections.singletonList(containerID)); - - proxy.getContainerManagementProtocol().stopContainers(stopRequest); - - // If stopContainer returns without an error, assuming the stop made - // it over to the NodeManager. - getContext().containerStopRequested(containerID); - } catch (Throwable t) { - - // ignore the cleanup failure - String message = "cleanup failed for container " - + this.containerID + " : " - + ExceptionUtils.getStackTrace(t); - getContext().containerStopFailed(containerID, message); - LOG.warn(message); - this.state = ContainerState.DONE; - return; - } finally { - if (proxy != null) { - cmProxy.mayBeCloseProxy(proxy); - } - } - this.state = ContainerState.DONE; - } - } - } - - public ContainerLauncherImpl(ContainerLauncherContext containerLauncherContext) { - super(containerLauncherContext); - try { - this.conf = TezUtils.createConfFromUserPayload(containerLauncherContext.getInitialUserPayload()); - } catch (IOException e) { - throw new TezUncheckedException( - "Failed to parse user payload for " + ContainerLauncherImpl.class.getSimpleName(), e); - } - conf.setInt( - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, - 0); - this.limitOnPoolSize = conf.getInt( - TezConfiguration.TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT, - TezConfiguration.TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT_DEFAULT); - LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize); - } - - @Override - public void start() { - // pass a copy of config to ContainerManagementProtocolProxy until YARN-3497 is fixed - cmProxy = - new ContainerManagementProtocolProxy(conf); - - ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( - "ContainerLauncher #%d").setDaemon(true).build(); - - // Start with a default core-pool size of 10 and change it dynamically. - launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE, - Integer.MAX_VALUE, 1, TimeUnit.HOURS, - new LinkedBlockingQueue<Runnable>(), - tf, new CustomizedRejectedExecutionHandler()); - eventHandlingThread = new Thread() { - @Override - public void run() { - ContainerOp event = null; - while (!Thread.currentThread().isInterrupted()) { - try { - event = eventQueue.take(); - } catch (InterruptedException e) { - if(!serviceStopped.get()) { - LOG.error("Returning, interrupted : " + e); - } - return; - } - int poolSize = launcherPool.getCorePoolSize(); - - // See if we need up the pool size only if haven't reached the - // maximum limit yet. - if (poolSize != limitOnPoolSize) { - - // nodes where containers will run at *this* point of time. This is - // *not* the cluster size and doesn't need to be. - int numNodes = - getContext().getNumNodes(TezConstants.getTezYarnServicePluginName()); - int idealPoolSize = Math.min(limitOnPoolSize, numNodes); - - if (poolSize < idealPoolSize) { - // Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the - // later is just a buffer so we are not always increasing the - // pool-size - int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize - + INITIAL_POOL_SIZE); - LOG.info("Setting ContainerLauncher pool size to " + newPoolSize - + " as number-of-nodes to talk to is " + numNodes); - launcherPool.setCorePoolSize(newPoolSize); - } - } - - // the events from the queue are handled in parallel - // using a thread pool - launcherPool.execute(createEventProcessor(event)); - - // TODO: Group launching of multiple containers to a single - // NodeManager into a single connection - } - } - }; - eventHandlingThread.setName("ContainerLauncher Event Handler"); - eventHandlingThread.start(); - } - - private void shutdownAllContainers() { - for (Container ct : this.containers.values()) { - if (ct != null) { - ct.kill(); - } - } - } - - @Override - public void shutdown() { - if(!serviceStopped.compareAndSet(false, true)) { - LOG.info("Ignoring multiple stops"); - return; - } - // shutdown any containers that might be left running - shutdownAllContainers(); - if (eventHandlingThread != null) { - eventHandlingThread.interrupt(); - } - if (launcherPool != null) { - launcherPool.shutdownNow(); - } - } - - protected EventProcessor createEventProcessor(ContainerOp event) { - return new EventProcessor(event); - } - - protected ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData getCMProxy( - ContainerId containerID, final String containerManagerBindAddr, - Token containerToken) throws IOException { - return cmProxy.getProxy(containerManagerBindAddr, containerID); - } - - /** - * Setup and start the container on remote nodemanager. - */ - class EventProcessor implements Runnable { - private ContainerOp event; - - EventProcessor(ContainerOp event) { - this.event = event; - } - - @Override - public void run() { - LOG.info("Processing operation {}", event.toString()); - - // Load ContainerManager tokens before creating a connection. - // TODO: Do it only once per NodeManager. - ContainerId containerID = event.getBaseOperation().getContainerId(); - - Container c = getContainer(event); - switch(event.getOpType()) { - case LAUNCH_REQUEST: - ContainerLaunchRequest launchRequest = event.getLaunchRequest(); - c.launch(launchRequest); - break; - case STOP_REQUEST: - c.kill(); - break; - } - removeContainerIfDone(containerID); - } - } - - /** - * ThreadPoolExecutor.submit may fail if you are submitting task - * when ThreadPoolExecutor is shutting down (DAGAppMaster is shutting down). - * Use this CustomizedRejectedExecutionHandler to just logging rather than abort the application. - */ - private static class CustomizedRejectedExecutionHandler implements RejectedExecutionHandler { - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - LOG.warn("Can't submit task to ThreadPoolExecutor:" + executor); - } - } - - @SuppressWarnings("unchecked") - void sendContainerLaunchFailedMsg(ContainerId containerId, - String message) { - LOG.error(message); - getContext().containerLaunchFailed(containerId, message); - } - - - @Override - public void launchContainer(ContainerLaunchRequest launchRequest) { - try { - eventQueue.put(new ContainerOp(ContainerOp.OPType.LAUNCH_REQUEST, launchRequest)); - } catch (InterruptedException e) { - throw new TezUncheckedException(e); - } - } - - @Override - public void stopContainer(ContainerStopRequest stopRequest) { - try { - eventQueue.put(new ContainerOp(ContainerOp.OPType.STOP_REQUEST, stopRequest)); - } catch (InterruptedException e) { - throw new TezUncheckedException(e); - } - } -}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java new file mode 100644 index 0000000..15a10bd --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java @@ -0,0 +1,217 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.launcher; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.net.UnknownHostException; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; +import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; +import org.apache.tez.serviceplugins.api.ContainerLauncher; +import org.apache.tez.serviceplugins.api.ContainerLauncherContext; +import org.apache.tez.serviceplugins.api.ContainerStopRequest; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.ContainerLauncherContextImpl; +import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; +import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.dag.app.rm.ContainerLauncherEvent; +import org.apache.tez.dag.app.rm.ContainerLauncherLaunchRequestEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ContainerLauncherManager extends AbstractService + implements EventHandler<ContainerLauncherEvent> { + + static final Logger LOG = LoggerFactory.getLogger(TezContainerLauncherImpl.class); + + @VisibleForTesting + final ContainerLauncher containerLaunchers[]; + @VisibleForTesting + final ContainerLauncherContext containerLauncherContexts[]; + protected final ServicePluginLifecycleAbstractService[] containerLauncherServiceWrappers; + private final AppContext appContext; + + @VisibleForTesting + public ContainerLauncherManager(ContainerLauncher containerLauncher, AppContext context) { + super(ContainerLauncherManager.class.getName()); + this.appContext = context; + containerLaunchers = new ContainerLauncher[] {containerLauncher}; + containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()}; + containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[]{ + new ServicePluginLifecycleAbstractService<>(containerLauncher)}; + } + + // Accepting conf to setup final parameters, if required. + public ContainerLauncherManager(AppContext context, + TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, + String workingDirectory, + List<NamedEntityDescriptor> containerLauncherDescriptors, + boolean isPureLocalMode) { + super(ContainerLauncherManager.class.getName()); + + this.appContext = context; + Preconditions.checkArgument( + containerLauncherDescriptors != null && !containerLauncherDescriptors.isEmpty(), + "ContainerLauncherDescriptors must be specified"); + containerLauncherContexts = new ContainerLauncherContext[containerLauncherDescriptors.size()]; + containerLaunchers = new ContainerLauncher[containerLauncherDescriptors.size()]; + containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[containerLauncherDescriptors.size()]; + + + for (int i = 0; i < containerLauncherDescriptors.size(); i++) { + UserPayload userPayload = containerLauncherDescriptors.get(i).getUserPayload(); + ContainerLauncherContext containerLauncherContext = + new ContainerLauncherContextImpl(context, taskCommunicatorManagerInterface, userPayload); + containerLauncherContexts[i] = containerLauncherContext; + containerLaunchers[i] = createContainerLauncher(containerLauncherDescriptors.get(i), context, + containerLauncherContext, taskCommunicatorManagerInterface, workingDirectory, i, isPureLocalMode); + containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(containerLaunchers[i]); + } + } + + @VisibleForTesting + ContainerLauncher createContainerLauncher( + NamedEntityDescriptor containerLauncherDescriptor, + AppContext context, + ContainerLauncherContext containerLauncherContext, + TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, + String workingDirectory, + int containerLauncherIndex, + boolean isPureLocalMode) { + if (containerLauncherDescriptor.getEntityName().equals( + TezConstants.getTezYarnServicePluginName())) { + return createYarnContainerLauncher(containerLauncherContext); + } else if (containerLauncherDescriptor.getEntityName() + .equals(TezConstants.getTezUberServicePluginName())) { + return createUberContainerLauncher(containerLauncherContext, context, + taskCommunicatorManagerInterface, + workingDirectory, isPureLocalMode); + } else { + return createCustomContainerLauncher(containerLauncherContext, containerLauncherDescriptor); + } + } + + @VisibleForTesting + ContainerLauncher createYarnContainerLauncher(ContainerLauncherContext containerLauncherContext) { + LOG.info("Creating DefaultContainerLauncher"); + return new TezContainerLauncherImpl(containerLauncherContext); + } + + @VisibleForTesting + ContainerLauncher createUberContainerLauncher(ContainerLauncherContext containerLauncherContext, + AppContext context, + TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, + String workingDirectory, + boolean isPureLocalMode) { + LOG.info("Creating LocalContainerLauncher"); + // TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of + // extensive internals which are only available at runtime. Will likely require + // some kind of runtime binding of parameters in the payload to work correctly. + try { + return + new LocalContainerLauncher(containerLauncherContext, context, + taskCommunicatorManagerInterface, + workingDirectory, isPureLocalMode); + } catch (UnknownHostException e) { + throw new TezUncheckedException(e); + } + } + + @VisibleForTesting + @SuppressWarnings("unchecked") + ContainerLauncher createCustomContainerLauncher(ContainerLauncherContext containerLauncherContext, + NamedEntityDescriptor containerLauncherDescriptor) { + LOG.info("Creating container launcher {}:{} ", containerLauncherDescriptor.getEntityName(), + containerLauncherDescriptor.getClassName()); + Class<? extends ContainerLauncher> containerLauncherClazz = + (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz( + containerLauncherDescriptor.getClassName()); + try { + Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz + .getConstructor(ContainerLauncherContext.class); + return ctor.newInstance(containerLauncherContext); + } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { + throw new TezUncheckedException(e); + } + + } + + @Override + public void serviceInit(Configuration conf) { + for (int i = 0 ; i < containerLaunchers.length ; i++) { + containerLauncherServiceWrappers[i].init(conf); + } + } + + @Override + public void serviceStart() { + for (int i = 0 ; i < containerLaunchers.length ; i++) { + containerLauncherServiceWrappers[i].start(); + } + } + + @Override + public void serviceStop() { + for (int i = 0 ; i < containerLaunchers.length ; i++) { + containerLauncherServiceWrappers[i].stop(); + } + } + + public void dagComplete(DAG dag) { + // Nothing required at the moment. Containers are shared across DAGs + } + + public void dagSubmitted() { + // Nothing to do right now. Indicates that a new DAG has been submitted and + // the context has updated information. + } + + + @Override + public void handle(ContainerLauncherEvent event) { + int launcherId = event.getLauncherId(); + String schedulerName = appContext.getTaskSchedulerName(event.getSchedulerId()); + String taskCommName = appContext.getTaskCommunicatorName(event.getTaskCommId()); + switch (event.getType()) { + case CONTAINER_LAUNCH_REQUEST: + ContainerLauncherLaunchRequestEvent launchEvent = (ContainerLauncherLaunchRequestEvent) event; + ContainerLaunchRequest launchRequest = + new ContainerLaunchRequest(launchEvent.getNodeId(), launchEvent.getContainerId(), + launchEvent.getContainerToken(), launchEvent.getContainerLaunchContext(), + launchEvent.getContainer(), schedulerName, + taskCommName); + containerLaunchers[launcherId].launchContainer(launchRequest); + break; + case CONTAINER_STOP_REQUEST: + ContainerStopRequest stopRequest = + new ContainerStopRequest(event.getNodeId(), event.getContainerId(), + event.getContainerToken(), schedulerName, taskCommName); + containerLaunchers[launcherId].stopContainer(stopRequest); + break; + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java deleted file mode 100644 index b56bd5b..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.app.launcher; - -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.net.UnknownHostException; -import java.util.List; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.tez.common.ReflectionUtils; -import org.apache.tez.dag.api.NamedEntityDescriptor; -import org.apache.tez.dag.api.TezConstants; -import org.apache.tez.dag.api.UserPayload; -import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; -import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; -import org.apache.tez.serviceplugins.api.ContainerLauncher; -import org.apache.tez.serviceplugins.api.ContainerLauncherContext; -import org.apache.tez.serviceplugins.api.ContainerStopRequest; -import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.app.AppContext; -import org.apache.tez.dag.app.ContainerLauncherContextImpl; -import org.apache.tez.dag.app.TaskAttemptListener; -import org.apache.tez.dag.app.dag.DAG; -import org.apache.tez.dag.app.rm.NMCommunicatorEvent; -import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ContainerLauncherRouter extends AbstractService - implements EventHandler<NMCommunicatorEvent> { - - static final Logger LOG = LoggerFactory.getLogger(ContainerLauncherImpl.class); - - @VisibleForTesting - final ContainerLauncher containerLaunchers[]; - @VisibleForTesting - final ContainerLauncherContext containerLauncherContexts[]; - protected final ServicePluginLifecycleAbstractService[] containerLauncherServiceWrappers; - private final AppContext appContext; - - @VisibleForTesting - public ContainerLauncherRouter(ContainerLauncher containerLauncher, AppContext context) { - super(ContainerLauncherRouter.class.getName()); - this.appContext = context; - containerLaunchers = new ContainerLauncher[] {containerLauncher}; - containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()}; - containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[]{ - new ServicePluginLifecycleAbstractService<>(containerLauncher)}; - } - - // Accepting conf to setup final parameters, if required. - public ContainerLauncherRouter(AppContext context, - TaskAttemptListener taskAttemptListener, - String workingDirectory, - List<NamedEntityDescriptor> containerLauncherDescriptors, - boolean isPureLocalMode) { - super(ContainerLauncherRouter.class.getName()); - - this.appContext = context; - Preconditions.checkArgument( - containerLauncherDescriptors != null && !containerLauncherDescriptors.isEmpty(), - "ContainerLauncherDescriptors must be specified"); - containerLauncherContexts = new ContainerLauncherContext[containerLauncherDescriptors.size()]; - containerLaunchers = new ContainerLauncher[containerLauncherDescriptors.size()]; - containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[containerLauncherDescriptors.size()]; - - - for (int i = 0; i < containerLauncherDescriptors.size(); i++) { - UserPayload userPayload = containerLauncherDescriptors.get(i).getUserPayload(); - ContainerLauncherContext containerLauncherContext = - new ContainerLauncherContextImpl(context, taskAttemptListener, userPayload); - containerLauncherContexts[i] = containerLauncherContext; - containerLaunchers[i] = createContainerLauncher(containerLauncherDescriptors.get(i), context, - containerLauncherContext, taskAttemptListener, workingDirectory, i, isPureLocalMode); - containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(containerLaunchers[i]); - } - } - - @VisibleForTesting - ContainerLauncher createContainerLauncher( - NamedEntityDescriptor containerLauncherDescriptor, - AppContext context, - ContainerLauncherContext containerLauncherContext, - TaskAttemptListener taskAttemptListener, - String workingDirectory, - int containerLauncherIndex, - boolean isPureLocalMode) { - if (containerLauncherDescriptor.getEntityName().equals( - TezConstants.getTezYarnServicePluginName())) { - return createYarnContainerLauncher(containerLauncherContext); - } else if (containerLauncherDescriptor.getEntityName() - .equals(TezConstants.getTezUberServicePluginName())) { - return createUberContainerLauncher(containerLauncherContext, context, taskAttemptListener, - workingDirectory, isPureLocalMode); - } else { - return createCustomContainerLauncher(containerLauncherContext, containerLauncherDescriptor); - } - } - - @VisibleForTesting - ContainerLauncher createYarnContainerLauncher(ContainerLauncherContext containerLauncherContext) { - LOG.info("Creating DefaultContainerLauncher"); - return new ContainerLauncherImpl(containerLauncherContext); - } - - @VisibleForTesting - ContainerLauncher createUberContainerLauncher(ContainerLauncherContext containerLauncherContext, - AppContext context, - TaskAttemptListener taskAttemptListener, - String workingDirectory, - boolean isPureLocalMode) { - LOG.info("Creating LocalContainerLauncher"); - // TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of - // extensive internals which are only available at runtime. Will likely require - // some kind of runtime binding of parameters in the payload to work correctly. - try { - return - new LocalContainerLauncher(containerLauncherContext, context, taskAttemptListener, - workingDirectory, isPureLocalMode); - } catch (UnknownHostException e) { - throw new TezUncheckedException(e); - } - } - - @VisibleForTesting - @SuppressWarnings("unchecked") - ContainerLauncher createCustomContainerLauncher(ContainerLauncherContext containerLauncherContext, - NamedEntityDescriptor containerLauncherDescriptor) { - LOG.info("Creating container launcher {}:{} ", containerLauncherDescriptor.getEntityName(), - containerLauncherDescriptor.getClassName()); - Class<? extends ContainerLauncher> containerLauncherClazz = - (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz( - containerLauncherDescriptor.getClassName()); - try { - Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz - .getConstructor(ContainerLauncherContext.class); - return ctor.newInstance(containerLauncherContext); - } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { - throw new TezUncheckedException(e); - } - - } - - @Override - public void serviceInit(Configuration conf) { - for (int i = 0 ; i < containerLaunchers.length ; i++) { - containerLauncherServiceWrappers[i].init(conf); - } - } - - @Override - public void serviceStart() { - for (int i = 0 ; i < containerLaunchers.length ; i++) { - containerLauncherServiceWrappers[i].start(); - } - } - - @Override - public void serviceStop() { - for (int i = 0 ; i < containerLaunchers.length ; i++) { - containerLauncherServiceWrappers[i].stop(); - } - } - - public void dagComplete(DAG dag) { - // Nothing required at the moment. Containers are shared across DAGs - } - - public void dagSubmitted() { - // Nothing to do right now. Indicates that a new DAG has been submitted and - // the context has updated information. - } - - - @Override - public void handle(NMCommunicatorEvent event) { - int launcherId = event.getLauncherId(); - String schedulerName = appContext.getTaskSchedulerName(event.getSchedulerId()); - String taskCommName = appContext.getTaskCommunicatorName(event.getTaskCommId()); - switch (event.getType()) { - case CONTAINER_LAUNCH_REQUEST: - NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event; - ContainerLaunchRequest launchRequest = - new ContainerLaunchRequest(launchEvent.getNodeId(), launchEvent.getContainerId(), - launchEvent.getContainerToken(), launchEvent.getContainerLaunchContext(), - launchEvent.getContainer(), schedulerName, - taskCommName); - containerLaunchers[launcherId].launchContainer(launchRequest); - break; - case CONTAINER_STOP_REQUEST: - ContainerStopRequest stopRequest = - new ContainerStopRequest(event.getNodeId(), event.getContainerId(), - event.getContainerToken(), schedulerName, taskCommName); - containerLaunchers[launcherId].stopContainer(stopRequest); - break; - } - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index 1d3e6df..6cd6fce 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -63,7 +63,7 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; -import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TezTaskCommunicatorImpl; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; @@ -83,7 +83,7 @@ public class LocalContainerLauncher extends ContainerLauncher { private final AppContext context; private final AtomicBoolean serviceStopped = new AtomicBoolean(false); private final String workingDirectory; - private final TaskAttemptListener tal; + private final TaskCommunicatorManagerInterface tal; private final Map<String, String> localEnv; private final ExecutionContext executionContext; private final int numExecutors; @@ -105,7 +105,7 @@ public class LocalContainerLauncher extends ContainerLauncher { public LocalContainerLauncher(ContainerLauncherContext containerLauncherContext, AppContext context, - TaskAttemptListener taskAttemptListener, + TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, String workingDirectory, boolean isPureLocalMode) throws UnknownHostException { // TODO Post TEZ-2003. Most of this information is dynamic and only available after the AM @@ -114,7 +114,7 @@ public class LocalContainerLauncher extends ContainerLauncher { // after the AM starts up. super(containerLauncherContext); this.context = context; - this.tal = taskAttemptListener; + this.tal = taskCommunicatorManagerInterface; this.workingDirectory = workingDirectory; this.isPureLocalMode = isPureLocalMode; if (isPureLocalMode) { http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java new file mode 100644 index 0000000..3556c51 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java @@ -0,0 +1,410 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.app.launcher; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; +import org.apache.tez.serviceplugins.api.ContainerLauncher; +import org.apache.tez.serviceplugins.api.ContainerLauncherContext; +import org.apache.tez.serviceplugins.api.ContainerStopRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy; +import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData; +import org.apache.hadoop.yarn.util.Records; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + + +// TODO See what part of this lifecycle and state management can be simplified. +// Ideally, no state - only sendStart / sendStop. + +// TODO Review this entire code and clean it up. + +/** + * This class is responsible for launching of containers. + */ +public class TezContainerLauncherImpl extends ContainerLauncher { + + // TODO Ensure the same thread is used to launch / stop the same container. Or - ensure event ordering. + static final Logger LOG = LoggerFactory.getLogger(TezContainerLauncherImpl.class); + + private final ConcurrentHashMap<ContainerId, Container> containers = + new ConcurrentHashMap<>(); + protected ThreadPoolExecutor launcherPool; + protected static final int INITIAL_POOL_SIZE = 10; + private final int limitOnPoolSize; + private final Configuration conf; + private Thread eventHandlingThread; + protected BlockingQueue<ContainerOp> eventQueue = new LinkedBlockingQueue<>(); + private ContainerManagementProtocolProxy cmProxy; + private AtomicBoolean serviceStopped = new AtomicBoolean(false); + + private Container getContainer(ContainerOp event) { + ContainerId id = event.getBaseOperation().getContainerId(); + Container c = containers.get(id); + if(c == null) { + c = new Container(id, + event.getBaseOperation().getNodeId().toString(), event.getBaseOperation().getContainerToken()); + Container old = containers.putIfAbsent(id, c); + if(old != null) { + c = old; + } + } + return c; + } + + private void removeContainerIfDone(ContainerId id) { + Container c = containers.get(id); + if(c != null && c.isCompletelyDone()) { + containers.remove(id); + } + } + + + private static enum ContainerState { + PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH + } + + private class Container { + private ContainerState state; + // store enough information to be able to cleanup the container + private ContainerId containerID; + final private String containerMgrAddress; + private Token containerToken; + + public Container(ContainerId containerID, + String containerMgrAddress, Token containerToken) { + this.state = ContainerState.PREP; + this.containerMgrAddress = containerMgrAddress; + this.containerID = containerID; + this.containerToken = containerToken; + } + + public synchronized boolean isCompletelyDone() { + return state == ContainerState.DONE || state == ContainerState.FAILED; + } + + @SuppressWarnings("unchecked") + public synchronized void launch(ContainerLaunchRequest event) { + LOG.info("Launching Container with Id: " + event.getContainerId()); + if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) { + state = ContainerState.DONE; + sendContainerLaunchFailedMsg(event.getContainerId(), + "Container was killed before it was launched"); + return; + } + + ContainerManagementProtocolProxyData proxy = null; + try { + + proxy = getCMProxy(containerID, containerMgrAddress, + containerToken); + + // Construct the actual Container + ContainerLaunchContext containerLaunchContext = + event.getContainerLaunchContext(); + + // Now launch the actual container + StartContainerRequest startRequest = Records + .newRecord(StartContainerRequest.class); + startRequest.setContainerToken(event.getContainerToken()); + startRequest.setContainerLaunchContext(containerLaunchContext); + + StartContainersResponse response = + proxy.getContainerManagementProtocol().startContainers( + StartContainersRequest.newInstance( + Collections.singletonList(startRequest))); + if (response.getFailedRequests() != null + && !response.getFailedRequests().isEmpty()) { + throw response.getFailedRequests().get(containerID).deSerialize(); + } + + // after launching, send launched event to task attempt to move + // it from ASSIGNED to RUNNING state + getContext().containerLaunched(containerID); + this.state = ContainerState.RUNNING; + } catch (Throwable t) { + String message = "Container launch failed for " + containerID + " : " + + ExceptionUtils.getStackTrace(t); + this.state = ContainerState.FAILED; + sendContainerLaunchFailedMsg(containerID, message); + } finally { + if (proxy != null) { + cmProxy.mayBeCloseProxy(proxy); + } + } + } + + @SuppressWarnings("unchecked") + public synchronized void kill() { + + if(isCompletelyDone()) { + return; + } + if(this.state == ContainerState.PREP) { + this.state = ContainerState.KILLED_BEFORE_LAUNCH; + } else { + LOG.info("Sending a stop request to the NM for ContainerId: " + + containerID); + + ContainerManagementProtocolProxyData proxy = null; + try { + proxy = getCMProxy(this.containerID, this.containerMgrAddress, + this.containerToken); + + // kill the remote container if already launched + StopContainersRequest stopRequest = Records + .newRecord(StopContainersRequest.class); + stopRequest.setContainerIds(Collections.singletonList(containerID)); + + proxy.getContainerManagementProtocol().stopContainers(stopRequest); + + // If stopContainer returns without an error, assuming the stop made + // it over to the NodeManager. + getContext().containerStopRequested(containerID); + } catch (Throwable t) { + + // ignore the cleanup failure + String message = "cleanup failed for container " + + this.containerID + " : " + + ExceptionUtils.getStackTrace(t); + getContext().containerStopFailed(containerID, message); + LOG.warn(message); + this.state = ContainerState.DONE; + return; + } finally { + if (proxy != null) { + cmProxy.mayBeCloseProxy(proxy); + } + } + this.state = ContainerState.DONE; + } + } + } + + public TezContainerLauncherImpl(ContainerLauncherContext containerLauncherContext) { + super(containerLauncherContext); + try { + this.conf = TezUtils.createConfFromUserPayload(containerLauncherContext.getInitialUserPayload()); + } catch (IOException e) { + throw new TezUncheckedException( + "Failed to parse user payload for " + TezContainerLauncherImpl.class.getSimpleName(), e); + } + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, + 0); + this.limitOnPoolSize = conf.getInt( + TezConfiguration.TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT, + TezConfiguration.TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT_DEFAULT); + LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize); + } + + @Override + public void start() { + // pass a copy of config to ContainerManagementProtocolProxy until YARN-3497 is fixed + cmProxy = + new ContainerManagementProtocolProxy(conf); + + ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( + "ContainerLauncher #%d").setDaemon(true).build(); + + // Start with a default core-pool size of 10 and change it dynamically. + launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE, + Integer.MAX_VALUE, 1, TimeUnit.HOURS, + new LinkedBlockingQueue<Runnable>(), + tf, new CustomizedRejectedExecutionHandler()); + eventHandlingThread = new Thread() { + @Override + public void run() { + ContainerOp event = null; + while (!Thread.currentThread().isInterrupted()) { + try { + event = eventQueue.take(); + } catch (InterruptedException e) { + if(!serviceStopped.get()) { + LOG.error("Returning, interrupted : " + e); + } + return; + } + int poolSize = launcherPool.getCorePoolSize(); + + // See if we need up the pool size only if haven't reached the + // maximum limit yet. + if (poolSize != limitOnPoolSize) { + + // nodes where containers will run at *this* point of time. This is + // *not* the cluster size and doesn't need to be. + int numNodes = + getContext().getNumNodes(TezConstants.getTezYarnServicePluginName()); + int idealPoolSize = Math.min(limitOnPoolSize, numNodes); + + if (poolSize < idealPoolSize) { + // Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the + // later is just a buffer so we are not always increasing the + // pool-size + int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize + + INITIAL_POOL_SIZE); + LOG.info("Setting ContainerLauncher pool size to " + newPoolSize + + " as number-of-nodes to talk to is " + numNodes); + launcherPool.setCorePoolSize(newPoolSize); + } + } + + // the events from the queue are handled in parallel + // using a thread pool + launcherPool.execute(createEventProcessor(event)); + + // TODO: Group launching of multiple containers to a single + // NodeManager into a single connection + } + } + }; + eventHandlingThread.setName("ContainerLauncher Event Handler"); + eventHandlingThread.start(); + } + + private void shutdownAllContainers() { + for (Container ct : this.containers.values()) { + if (ct != null) { + ct.kill(); + } + } + } + + @Override + public void shutdown() { + if(!serviceStopped.compareAndSet(false, true)) { + LOG.info("Ignoring multiple stops"); + return; + } + // shutdown any containers that might be left running + shutdownAllContainers(); + if (eventHandlingThread != null) { + eventHandlingThread.interrupt(); + } + if (launcherPool != null) { + launcherPool.shutdownNow(); + } + } + + protected EventProcessor createEventProcessor(ContainerOp event) { + return new EventProcessor(event); + } + + protected ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData getCMProxy( + ContainerId containerID, final String containerManagerBindAddr, + Token containerToken) throws IOException { + return cmProxy.getProxy(containerManagerBindAddr, containerID); + } + + /** + * Setup and start the container on remote nodemanager. + */ + class EventProcessor implements Runnable { + private ContainerOp event; + + EventProcessor(ContainerOp event) { + this.event = event; + } + + @Override + public void run() { + LOG.info("Processing operation {}", event.toString()); + + // Load ContainerManager tokens before creating a connection. + // TODO: Do it only once per NodeManager. + ContainerId containerID = event.getBaseOperation().getContainerId(); + + Container c = getContainer(event); + switch(event.getOpType()) { + case LAUNCH_REQUEST: + ContainerLaunchRequest launchRequest = event.getLaunchRequest(); + c.launch(launchRequest); + break; + case STOP_REQUEST: + c.kill(); + break; + } + removeContainerIfDone(containerID); + } + } + + /** + * ThreadPoolExecutor.submit may fail if you are submitting task + * when ThreadPoolExecutor is shutting down (DAGAppMaster is shutting down). + * Use this CustomizedRejectedExecutionHandler to just logging rather than abort the application. + */ + private static class CustomizedRejectedExecutionHandler implements RejectedExecutionHandler { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + LOG.warn("Can't submit task to ThreadPoolExecutor:" + executor); + } + } + + @SuppressWarnings("unchecked") + void sendContainerLaunchFailedMsg(ContainerId containerId, + String message) { + LOG.error(message); + getContext().containerLaunchFailed(containerId, message); + } + + + @Override + public void launchContainer(ContainerLaunchRequest launchRequest) { + try { + eventQueue.put(new ContainerOp(ContainerOp.OPType.LAUNCH_REQUEST, launchRequest)); + } catch (InterruptedException e) { + throw new TezUncheckedException(e); + } + } + + @Override + public void stopContainer(ContainerStopRequest stopRequest) { + try { + eventQueue.put(new ContainerOp(ContainerOp.OPType.STOP_REQUEST, stopRequest)); + } catch (InterruptedException e) { + throw new TezUncheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEvent.java new file mode 100644 index 0000000..add3254 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEvent.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.rm; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class ContainerLauncherEvent extends AbstractEvent<ContainerLauncherEventType> { + + private final ContainerId containerId; + private final NodeId nodeId; + private final Token containerToken; + private final int launcherId; + private final int schedulerId; + private final int taskCommId; + + public ContainerLauncherEvent(ContainerId containerId, NodeId nodeId, + Token containerToken, ContainerLauncherEventType type, + int launcherId, + int schedulerId, int taskCommId) { + super(type); + this.containerId = containerId; + this.nodeId = nodeId; + this.containerToken = containerToken; + this.launcherId = launcherId; + this.schedulerId = schedulerId; + this.taskCommId = taskCommId; + } + + public ContainerId getContainerId() { + return this.containerId; + } + + public NodeId getNodeId() { + return this.nodeId; + } + + public Token getContainerToken() { + return this.containerToken; + } + + public int getLauncherId() { + return launcherId; + } + + public int getSchedulerId() { + return schedulerId; + } + + public int getTaskCommId() { + return taskCommId; + } + + public String toSrting() { + return super.toString() + " for container " + containerId + ", nodeId: " + + nodeId + ", launcherId: " + launcherId + ", schedulerId=" + schedulerId + + ", taskCommId=" + taskCommId; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((containerId == null) ? 0 : containerId.hashCode()); + result = prime * result + + ((containerToken == null) ? 0 : containerToken.hashCode()); + result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ContainerLauncherEvent other = (ContainerLauncherEvent) obj; + if (containerId == null) { + if (other.containerId != null) + return false; + } else if (!containerId.equals(other.containerId)) + return false; + if (containerToken == null) { + if (other.containerToken != null) + return false; + } else if (!containerToken.equals(other.containerToken)) + return false; + if (nodeId == null) { + if (other.nodeId != null) + return false; + } else if (!nodeId.equals(other.nodeId)) + return false; + return true; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEventType.java new file mode 100644 index 0000000..079509c --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherEventType.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.rm; + +// TODO - Re-use the events in ContainerLauncher.. +public enum ContainerLauncherEventType { + CONTAINER_LAUNCH_REQUEST, + CONTAINER_STOP_REQUEST +} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherLaunchRequestEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherLaunchRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherLaunchRequestEvent.java new file mode 100644 index 0000000..411451e --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherLaunchRequestEvent.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.rm; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; + +public class ContainerLauncherLaunchRequestEvent extends ContainerLauncherEvent { + + private final ContainerLaunchContext clc; + private final Container container; + // The task communicator index for the specific container being launched. + + public ContainerLauncherLaunchRequestEvent(ContainerLaunchContext clc, + Container container, int launcherId, int schedulerId, + int taskCommId) { + super(container.getId(), container.getNodeId(), container + .getContainerToken(), ContainerLauncherEventType.CONTAINER_LAUNCH_REQUEST, + launcherId, schedulerId, taskCommId); + this.clc = clc; + this.container = container; + } + + public ContainerLaunchContext getContainerLaunchContext() { + return this.clc; + } + + public Container getContainer() { + return container; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + + ContainerLauncherLaunchRequestEvent that = (ContainerLauncherLaunchRequestEvent) o; + + if (clc != null ? !clc.equals(that.clc) : that.clc != null) { + return false; + } + if (container != null ? !container.equals(that.container) : that.container != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 7001 * result + (clc != null ? clc.hashCode() : 0); + result = 7001 * result + (container != null ? container.hashCode() : 0); + return result; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherStopRequestEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherStopRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherStopRequestEvent.java new file mode 100644 index 0000000..69e7d30 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerLauncherStopRequestEvent.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.rm; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Token; + +public class ContainerLauncherStopRequestEvent extends ContainerLauncherEvent { + + public ContainerLauncherStopRequestEvent(ContainerId containerId, NodeId nodeId, + Token containerToken, int launcherId, int schedulerId, + int taskCommId) { + super(containerId, nodeId, containerToken, + ContainerLauncherEventType.CONTAINER_STOP_REQUEST, launcherId, schedulerId, taskCommId); + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java deleted file mode 100644 index dc50c37..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.app.rm; - -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.event.AbstractEvent; - -public class NMCommunicatorEvent extends AbstractEvent<NMCommunicatorEventType> { - - private final ContainerId containerId; - private final NodeId nodeId; - private final Token containerToken; - private final int launcherId; - private final int schedulerId; - private final int taskCommId; - - public NMCommunicatorEvent(ContainerId containerId, NodeId nodeId, - Token containerToken, NMCommunicatorEventType type, int launcherId, - int schedulerId, int taskCommId) { - super(type); - this.containerId = containerId; - this.nodeId = nodeId; - this.containerToken = containerToken; - this.launcherId = launcherId; - this.schedulerId = schedulerId; - this.taskCommId = taskCommId; - } - - public ContainerId getContainerId() { - return this.containerId; - } - - public NodeId getNodeId() { - return this.nodeId; - } - - public Token getContainerToken() { - return this.containerToken; - } - - public int getLauncherId() { - return launcherId; - } - - public int getSchedulerId() { - return schedulerId; - } - - public int getTaskCommId() { - return taskCommId; - } - - public String toSrting() { - return super.toString() + " for container " + containerId + ", nodeId: " - + nodeId + ", launcherId: " + launcherId + ", schedulerId=" + schedulerId + - ", taskCommId=" + taskCommId; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result - + ((containerId == null) ? 0 : containerId.hashCode()); - result = prime * result - + ((containerToken == null) ? 0 : containerToken.hashCode()); - result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - NMCommunicatorEvent other = (NMCommunicatorEvent) obj; - if (containerId == null) { - if (other.containerId != null) - return false; - } else if (!containerId.equals(other.containerId)) - return false; - if (containerToken == null) { - if (other.containerToken != null) - return false; - } else if (!containerToken.equals(other.containerToken)) - return false; - if (nodeId == null) { - if (other.nodeId != null) - return false; - } else if (!nodeId.equals(other.nodeId)) - return false; - return true; - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEventType.java deleted file mode 100644 index 9f3d989..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEventType.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.app.rm; - -// TODO - Re-use the events in ContainerLauncher.. -public enum NMCommunicatorEventType { - CONTAINER_LAUNCH_REQUEST, - CONTAINER_STOP_REQUEST -} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java deleted file mode 100644 index c57b6be..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.app.rm; - -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; - -public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent { - - private final ContainerLaunchContext clc; - private final Container container; - // The task communicator index for the specific container being launched. - - public NMCommunicatorLaunchRequestEvent(ContainerLaunchContext clc, - Container container, int launcherId, int schedulerId, int taskCommId) { - super(container.getId(), container.getNodeId(), container - .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST, - launcherId, schedulerId, taskCommId); - this.clc = clc; - this.container = container; - } - - public ContainerLaunchContext getContainerLaunchContext() { - return this.clc; - } - - public Container getContainer() { - return container; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - - NMCommunicatorLaunchRequestEvent that = (NMCommunicatorLaunchRequestEvent) o; - - if (clc != null ? !clc.equals(that.clc) : that.clc != null) { - return false; - } - if (container != null ? !container.equals(that.container) : that.container != null) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - int result = super.hashCode(); - result = 7001 * result + (clc != null ? clc.hashCode() : 0); - result = 7001 * result + (container != null ? container.hashCode() : 0); - return result; - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java deleted file mode 100644 index 352f450..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.app.rm; - -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Token; - -public class NMCommunicatorStopRequestEvent extends NMCommunicatorEvent { - - public NMCommunicatorStopRequestEvent(ContainerId containerId, NodeId nodeId, - Token containerToken, int launcherId, int schedulerId, int taskCommId) { - super(containerId, nodeId, containerToken, - NMCommunicatorEventType.CONTAINER_STOP_REQUEST, launcherId, schedulerId, taskCommId); - } - -} http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java index 2a9797f..37aa96b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java @@ -33,7 +33,7 @@ import org.apache.tez.serviceplugins.api.TaskSchedulerContext; public class TaskSchedulerContextImpl implements TaskSchedulerContext { - private final TaskSchedulerEventHandler tseh; + private final TaskSchedulerManager taskSchedulerManager; private final AppContext appContext; private final int schedulerId; private final String trackingUrl; @@ -42,11 +42,11 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext { private final int clientPort; private final UserPayload initialUserPayload; - public TaskSchedulerContextImpl(TaskSchedulerEventHandler tseh, AppContext appContext, + public TaskSchedulerContextImpl(TaskSchedulerManager taskSchedulerManager, AppContext appContext, int schedulerId, String trackingUrl, long customClusterIdentifier, String appHostname, int clientPort, UserPayload initialUserPayload) { - this.tseh = tseh; + this.taskSchedulerManager = taskSchedulerManager; this.appContext = appContext; this.schedulerId = schedulerId; this.trackingUrl = trackingUrl; @@ -62,54 +62,55 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext { // taskAllocated() upcall and deallocateTask() downcall @Override public void taskAllocated(Object task, Object appCookie, Container container) { - tseh.taskAllocated(schedulerId, task, appCookie, container); + taskSchedulerManager.taskAllocated(schedulerId, task, appCookie, container); } @Override public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) { - tseh.containerCompleted(schedulerId, taskLastAllocated, containerStatus); + taskSchedulerManager.containerCompleted(schedulerId, taskLastAllocated, containerStatus); } @Override public void containerBeingReleased(ContainerId containerId) { - tseh.containerBeingReleased(schedulerId, containerId); + taskSchedulerManager.containerBeingReleased(schedulerId, containerId); } @Override public void nodesUpdated(List<NodeReport> updatedNodes) { - tseh.nodesUpdated(schedulerId, updatedNodes); + taskSchedulerManager.nodesUpdated(schedulerId, updatedNodes); } @Override public void appShutdownRequested() { - tseh.appShutdownRequested(schedulerId); + taskSchedulerManager.appShutdownRequested(schedulerId); } @Override public void setApplicationRegistrationData(Resource maxContainerCapability, Map<ApplicationAccessType, String> appAcls, ByteBuffer clientAMSecretKey) { - tseh.setApplicationRegistrationData(schedulerId, maxContainerCapability, appAcls, clientAMSecretKey); + taskSchedulerManager.setApplicationRegistrationData(schedulerId, maxContainerCapability, appAcls, + clientAMSecretKey); } @Override public void onError(Throwable t) { - tseh.onError(schedulerId, t); + taskSchedulerManager.onError(schedulerId, t); } @Override public float getProgress() { - return tseh.getProgress(schedulerId); + return taskSchedulerManager.getProgress(schedulerId); } @Override public void preemptContainer(ContainerId containerId) { - tseh.preemptContainer(schedulerId, containerId); + taskSchedulerManager.preemptContainer(schedulerId, containerId); } @Override public AppFinalStatus getFinalAppStatus() { - return tseh.getFinalAppStatus(); + return taskSchedulerManager.getFinalAppStatus(); } @Override @@ -130,7 +131,7 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext { @Override public ContainerSignatureMatcher getContainerSignatureMatcher() { - return tseh.getContainerSignatureMatcher(); + return taskSchedulerManager.getContainerSignatureMatcher(); } @Override
