Repository: incubator-myriad Updated Branches: refs/heads/master b56565046 -> 79ba4a5f0
[MYRIAD-153] tasks not finishing when FGS is enabled. The root cause is that when a container is preempted by the Resource Manager, it can go from AQUIRED to RELEASED in which case the Mesos task would be started, however since the container never got to state RUNNING the ContainerManagerImpl never called MyriadAuxService.initializeContainer or MyriadAuxService.stopContainer the solution is to intercept the method releaseContainers in AbstractScheduler to kill any yarn_task which is preempted this way and immediately adjust the resources of the node so another container doesn't start expecting these resources. JIRA: [MYRIAD-153] https://issues.apache.org/jira/browse/MYRIAD-153 Pull Request: Closes #59 Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/79ba4a5f Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/79ba4a5f Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/79ba4a5f Branch: refs/heads/master Commit: 79ba4a5f09c0bb59492a50bb8b310543fb372ead Parents: b565650 Author: DarinJ <dar...@apache.org> Authored: Mon Feb 22 23:05:29 2016 -0500 Committer: darinj <darinj.w...@gmail.com> Committed: Thu Mar 10 00:45:24 2016 -0500 ---------------------------------------------------------------------- .../apache/myriad/executor/MyriadExecutor.java | 19 ++++-- .../executor/MyriadExecutorAuxService.java | 1 + .../apache/myriad/scheduler/MyriadDriver.java | 1 - .../apache/myriad/scheduler/ResourceUtils.java | 33 ++++++++++ .../scheduler/fgs/YarnNodeCapacityManager.java | 64 +++++++++++++++++--- .../scheduler/yarn/MyriadFairScheduler.java | 20 ++++++ .../yarn/interceptor/BaseInterceptor.java | 15 +++++ .../yarn/interceptor/CompositeInterceptor.java | 36 +++++++++++ .../interceptor/YarnSchedulerInterceptor.java | 34 +++++++++-- 9 files changed, 203 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/79ba4a5f/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutor.java ---------------------------------------------------------------------- diff --git a/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutor.java b/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutor.java index 8aa580c..4f46c10 100644 --- a/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutor.java +++ b/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutor.java @@ -70,10 +70,11 @@ public class MyriadExecutor implements Executor { @Override public void killTask(ExecutorDriver driver, TaskID taskId) { - LOGGER.debug("killTask received for taskId: " + taskId.getValue()); + String taskIdString = taskId.toString(); + LOGGER.debug("killTask received for taskId: " + taskIdString); TaskStatus status; - if (!taskId.toString().contains(MyriadExecutorAuxService.YARN_CONTAINER_TASK_ID_PREFIX)) { + if (!taskIdString.contains(MyriadExecutorAuxService.YARN_CONTAINER_TASK_ID_PREFIX)) { // Inform mesos of killing all tasks corresponding to yarn containers that are // currently running synchronized (containerIds) { @@ -88,12 +89,20 @@ public class MyriadExecutor implements Executor { // Now kill the node manager task status = TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_KILLED).build(); driver.sendStatusUpdate(status); - LOGGER.info("NodeManager shutdown after receiving" + - " KillTask for taskId " + taskId.getValue()); + LOGGER.info("NodeManager shutdown after receiving KILL_TASK for taskId {}", taskIdString); Runtime.getRuntime().exit(0); } else { - LOGGER.debug("Cannot delete tasks corresponding to yarn container " + taskId); + status = TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_KILLED).build(); + driver.sendStatusUpdate(status); + synchronized (containerIds) { + //Likely the container isn't in here, but just in case remove it. + if (containerIds.remove(taskIdString.substring(MyriadExecutorAuxService.YARN_CONTAINER_FULL_PREFIX.length(), + taskIdString.length()))) { + LOGGER.debug("Removed taskId {} from containerIds", taskIdString); + } + } + LOGGER.debug("Killing " + taskId); } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/79ba4a5f/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutorAuxService.java ---------------------------------------------------------------------- diff --git a/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutorAuxService.java b/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutorAuxService.java index cca81b9..2d8ab78 100644 --- a/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutorAuxService.java +++ b/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutorAuxService.java @@ -44,6 +44,7 @@ public class MyriadExecutorAuxService extends AuxiliaryService { private static final Logger LOGGER = LoggerFactory.getLogger(MyriadExecutor.class); private static final String SERVICE_NAME = "myriad_service"; public static final String YARN_CONTAINER_TASK_ID_PREFIX = "yarn_"; + public static final String YARN_CONTAINER_FULL_PREFIX = "yarn_task_"; private MesosExecutorDriver driver; private Thread myriadExecutorThread; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/79ba4a5f/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java index 8ff10e3..014516d 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java @@ -53,7 +53,6 @@ public class MyriadDriver { } public Status kill(final TaskID taskId) { - LOGGER.info("Killing task {}", taskId); Status status = driver.killTask(taskId); LOGGER.info("Task {} killed with status: {}", taskId, status); return status; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/79ba4a5f/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ResourceUtils.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ResourceUtils.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ResourceUtils.java new file mode 100644 index 0000000..13f93fe --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ResourceUtils.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler; + +import org.apache.hadoop.yarn.api.records.Resource; +/** + * Small class of Yarn resource utils. Some methods may be redundant with methods in + * org.apache.hadoop.yarn.util.resource.Resources as of 2.7.0 but are here for backwards compatibilty + * with 2.6.0 + */ +public class ResourceUtils { + public static Resource componentwiseMax(Resource lhs, Resource rhs) { + int cores = Math.max(lhs.getVirtualCores(), rhs.getVirtualCores()); + int mem = Math.max(lhs.getMemory(), rhs.getMemory()); + return Resource.newInstance(cores, mem); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/79ba4a5f/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java index 15f4b47..1a5d185 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java @@ -25,14 +25,19 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import javax.inject.Inject; + import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -43,6 +48,7 @@ import org.apache.mesos.Protos; import org.apache.myriad.configuration.NodeManagerConfiguration; import org.apache.myriad.executor.ContainerTaskStatusRequest; import org.apache.myriad.scheduler.MyriadDriver; +import org.apache.myriad.scheduler.ResourceUtils; import org.apache.myriad.scheduler.SchedulerUtils; import org.apache.myriad.scheduler.TaskUtils; import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor; @@ -64,14 +70,15 @@ import org.slf4j.LoggerFactory; */ public class YarnNodeCapacityManager extends BaseInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(YarnNodeCapacityManager.class); - private final AbstractYarnScheduler yarnScheduler; private final RMContext rmContext; private final MyriadDriver myriadDriver; private final OfferLifecycleManager offerLifecycleMgr; private final NodeStore nodeStore; private final SchedulerState state; + private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0); private TaskUtils taskUtils; + @Inject public YarnNodeCapacityManager(InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler, RMContext rmContext, MyriadDriver myriadDriver, OfferLifecycleManager offerLifecycleMgr, NodeStore nodeStore, @@ -98,6 +105,44 @@ public class YarnNodeCapacityManager extends BaseInterceptor { }; } + private Protos.TaskID containerToTaskId(RMContainer container) { + return Protos.TaskID.newBuilder().setValue("yarn_" + container.getContainerId()).build(); + } + + @Override + public void beforeReleaseContainers(List<ContainerId> containerIds, SchedulerApplicationAttempt attempt) { + //NOOP beforeCompletedContainer does this + } + + @Override + public void beforeCompletedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType type) { + if (type.equals(RMContainerEventType.KILL) || type.equals(RMContainerEventType.RELEASED)) { + LOGGER.info("{} completed with exit status {}, killing cooresponding mesos task.", rmContainer.getContainerId().toString(), type); + removeYarnTask(rmContainer); + } + } + + private synchronized void removeYarnTask(RMContainer rmContainer) { + if (rmContainer != null && rmContainer.getContainer() != null) { + Protos.TaskID taskId = containerToTaskId(rmContainer); + //TODO (darinj) Reliable messaging + state.makeTaskKillable(taskId); + myriadDriver.kill(taskId); + String hostname = rmContainer.getContainer().getNodeId().getHost(); + Node node = nodeStore.getNode(hostname); + if (node != null) { + RMNode rmNode = node.getNode().getRMNode(); + Resource resource = rmContainer.getContainer().getResource(); + Resource diff = ResourceUtils.componentwiseMax(ZERO_RESOURCE, Resources.subtract(rmNode.getTotalCapability(), resource)); + setNodeCapacity(rmNode, diff); + LOGGER.info("Removed task yarn_{} with exit status freeing {} cpu and {} mem.", rmContainer.getContainer().toString(), + rmContainer.getContainerExitStatus(), resource.getVirtualCores(), resource.getMemory()); + } else { + LOGGER.warn(hostname + " not found"); + } + } + } + @Override public void afterSchedulerEventHandled(SchedulerEvent event) { switch (event.getType()) { @@ -196,13 +241,16 @@ public class YarnNodeCapacityManager extends BaseInterceptor { */ @SuppressWarnings("unchecked") public void setNodeCapacity(RMNode rmNode, Resource newCapacity) { - rmNode.getTotalCapability().setMemory(newCapacity.getMemory()); - rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores()); - LOGGER.debug("Setting capacity for node {} to {}", rmNode.getHostName(), newCapacity); - // updates the scheduler with the new capacity for the NM. - // the event is handled by the scheduler asynchronously - rmContext.getDispatcher().getEventHandler().handle(new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption.newInstance( - rmNode.getTotalCapability(), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT))); + //NOOP prevent YARN warning changing to same size + if (!Resources.equals(rmNode.getTotalCapability(), newCapacity)) { + rmNode.getTotalCapability().setMemory(newCapacity.getMemory()); + rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores()); + LOGGER.debug("Setting capacity for node {} to {}", rmNode.getHostName(), newCapacity); + // updates the scheduler with the new capacity for the NM. + // the event is handled by the scheduler asynchronously + rmContext.getDispatcher().getEventHandler().handle(new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption.newInstance( + rmNode.getTotalCapability(), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT))); + } } private Protos.TaskInfo getTaskInfoForContainer(RMContainer rmContainer, ConsumedOffer consumedOffer, Node node) { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/79ba4a5f/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java index a4b2056..9069c1a 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java @@ -19,14 +19,21 @@ package org.apache.myriad.scheduler.yarn; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor; import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor; +import java.util.List; + /** * {@link MyriadFairScheduler} just extends YARN's {@link FairScheduler} and * allows some of the {@link FairScheduler} methods to be intercepted @@ -65,6 +72,19 @@ public class MyriadFairScheduler extends FairScheduler { */ @Override + protected void releaseContainers(List<ContainerId> containers, SchedulerApplicationAttempt attempt) { + yarnSchedulerInterceptor.beforeReleaseContainers(containers, attempt); + super.releaseContainers(containers, attempt); + } + + @Override + public void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { + yarnSchedulerInterceptor.beforeCompletedContainer(rmContainer, containerStatus, event); + super.completedContainer(rmContainer, containerStatus, event); + } + + + @Override public synchronized void serviceInit(Configuration conf) throws Exception { this.conf = conf; super.serviceInit(conf); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/79ba4a5f/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/BaseInterceptor.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/BaseInterceptor.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/BaseInterceptor.java index 50b5b03..64e9158 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/BaseInterceptor.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/BaseInterceptor.java @@ -19,11 +19,18 @@ package org.apache.myriad.scheduler.yarn.interceptor; import java.io.IOException; +import java.util.List; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; /** @@ -47,6 +54,14 @@ public class BaseInterceptor implements YarnSchedulerInterceptor { } @Override + public void beforeReleaseContainers(List<ContainerId> containers, SchedulerApplicationAttempt attempt){ + } + + @Override + public void beforeCompletedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { + } + + @Override public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException { } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/79ba4a5f/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java index 6ac7af7..6c05d9b 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java @@ -21,12 +21,19 @@ package org.apache.myriad.scheduler.yarn.interceptor; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -77,6 +84,35 @@ public class CompositeInterceptor implements YarnSchedulerInterceptor, Intercept }; } + @Override + public void beforeReleaseContainers(List<ContainerId> containers, SchedulerApplicationAttempt attempt){ + if (containers != null && attempt != null) { + for (YarnSchedulerInterceptor interceptor : interceptors.values()) { + List<ContainerId> filteredContainers = new ArrayList<>(); + for (ContainerId containerId: containers) { + NodeId nodeId = attempt.getRMContainer(containerId).getContainer().getNodeId(); + if ((nodeId != null && interceptor.getCallBackFilter().allowCallBacksForNode(nodeId))) { + filteredContainers.add(containerId); + } + } + if (!filteredContainers.isEmpty()) { + interceptor.beforeReleaseContainers(filteredContainers, attempt); + } + } + } + } + @Override + public void beforeCompletedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { + if (rmContainer != null && rmContainer.getContainer() != null) { + NodeId nodeId = rmContainer.getContainer().getNodeId(); + for (YarnSchedulerInterceptor interceptor : interceptors.values()) { + if (interceptor.getCallBackFilter().allowCallBacksForNode(nodeId)) { + interceptor.beforeCompletedContainer(rmContainer, containerStatus, event); + } + } + } + } + /** * Allows myriad to be initialized via {@link #myriadInitInterceptor}. After myriad is initialized, * other interceptors will later register with this class via http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/79ba4a5f/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java index 2cda0d3..71f1a3d 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java @@ -19,12 +19,19 @@ package org.apache.myriad.scheduler.yarn.interceptor; import java.io.IOException; +import java.util.List; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; @@ -57,13 +64,28 @@ public interface YarnSchedulerInterceptor { public CallBackFilter getCallBackFilter(); /** - * Invoked *before* {@link AbstractYarnScheduler#reinitialize(Configuration, RMContext)} - * - * @param conf - * @param yarnScheduler - * @param rmContext - * @throws IOException + * Invoked *before* {@link org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler#releaseContainers(List, + * SchedulerApplicationAttempt)} + * only if {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true. + */ + + public void beforeReleaseContainers(List<ContainerId> containers, SchedulerApplicationAttempt attempt); + + /** + * Invoked *before* {@link org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler#completedContainer(RMContainer, + * ContainerStatus, RMContainerEventType)} + * only if {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true. */ + public void beforeCompletedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event); + + /** + * Invoked *before* {@link AbstractYarnScheduler#reinitialize(Configuration, RMContext)} + * + * @param conf + * @param yarnScheduler + * @param rmContext + * @throws IOException + */ public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException; /**