Repository: incubator-myriad
Updated Branches:
  refs/heads/master b56565046 -> 79ba4a5f0


[MYRIAD-153] tasks not finishing when FGS is enabled.

The root cause is that when a container is preempted by the Resource Manager, 
it can go from AQUIRED to RELEASED in which case the Mesos task would be 
started, however since the container never got to state RUNNING the 
ContainerManagerImpl never called MyriadAuxService.initializeContainer or 
MyriadAuxService.stopContainer the solution is to intercept the method 
releaseContainers in AbstractScheduler to kill any yarn_task which is preempted 
this way and immediately adjust the resources of the node so another container 
doesn't start expecting these resources.

JIRA:
  [MYRIAD-153] https://issues.apache.org/jira/browse/MYRIAD-153

Pull Request:
  Closes #59


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/79ba4a5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/79ba4a5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/79ba4a5f

Branch: refs/heads/master
Commit: 79ba4a5f09c0bb59492a50bb8b310543fb372ead
Parents: b565650
Author: DarinJ <dar...@apache.org>
Authored: Mon Feb 22 23:05:29 2016 -0500
Committer: darinj <darinj.w...@gmail.com>
Committed: Thu Mar 10 00:45:24 2016 -0500

----------------------------------------------------------------------
 .../apache/myriad/executor/MyriadExecutor.java  | 19 ++++--
 .../executor/MyriadExecutorAuxService.java      |  1 +
 .../apache/myriad/scheduler/MyriadDriver.java   |  1 -
 .../apache/myriad/scheduler/ResourceUtils.java  | 33 ++++++++++
 .../scheduler/fgs/YarnNodeCapacityManager.java  | 64 +++++++++++++++++---
 .../scheduler/yarn/MyriadFairScheduler.java     | 20 ++++++
 .../yarn/interceptor/BaseInterceptor.java       | 15 +++++
 .../yarn/interceptor/CompositeInterceptor.java  | 36 +++++++++++
 .../interceptor/YarnSchedulerInterceptor.java   | 34 +++++++++--
 9 files changed, 203 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/79ba4a5f/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutor.java
----------------------------------------------------------------------
diff --git 
a/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutor.java 
b/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutor.java
index 8aa580c..4f46c10 100644
--- 
a/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutor.java
+++ 
b/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutor.java
@@ -70,10 +70,11 @@ public class MyriadExecutor implements Executor {
 
   @Override
   public void killTask(ExecutorDriver driver, TaskID taskId) {
-    LOGGER.debug("killTask received for taskId: " + taskId.getValue());
+    String taskIdString = taskId.toString();
+    LOGGER.debug("killTask received for taskId: " + taskIdString);
     TaskStatus status;
 
-    if 
(!taskId.toString().contains(MyriadExecutorAuxService.YARN_CONTAINER_TASK_ID_PREFIX))
 {
+    if 
(!taskIdString.contains(MyriadExecutorAuxService.YARN_CONTAINER_TASK_ID_PREFIX))
 {
       // Inform mesos of killing all tasks corresponding to yarn containers 
that are
       // currently running 
       synchronized (containerIds) {
@@ -88,12 +89,20 @@ public class MyriadExecutor implements Executor {
       // Now kill the node manager task
       status = 
TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_KILLED).build();
       driver.sendStatusUpdate(status);
-      LOGGER.info("NodeManager shutdown after receiving" +
-          " KillTask for taskId " + taskId.getValue());
+      LOGGER.info("NodeManager shutdown after receiving KILL_TASK for taskId 
{}", taskIdString);
       Runtime.getRuntime().exit(0);
 
     } else {
-      LOGGER.debug("Cannot delete tasks corresponding to yarn container " + 
taskId);
+      status = 
TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_KILLED).build();
+      driver.sendStatusUpdate(status);
+      synchronized (containerIds) {
+        //Likely the container isn't in here, but just in case remove it.
+        if 
(containerIds.remove(taskIdString.substring(MyriadExecutorAuxService.YARN_CONTAINER_FULL_PREFIX.length(),
+            taskIdString.length()))) {
+          LOGGER.debug("Removed taskId {} from containerIds", taskIdString);
+        }
+      }
+      LOGGER.debug("Killing " + taskId);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/79ba4a5f/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutorAuxService.java
----------------------------------------------------------------------
diff --git 
a/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutorAuxService.java
 
b/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutorAuxService.java
index cca81b9..2d8ab78 100644
--- 
a/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutorAuxService.java
+++ 
b/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutorAuxService.java
@@ -44,6 +44,7 @@ public class MyriadExecutorAuxService extends 
AuxiliaryService {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MyriadExecutor.class);
   private static final String SERVICE_NAME = "myriad_service";
   public static final String YARN_CONTAINER_TASK_ID_PREFIX = "yarn_";
+  public static final String YARN_CONTAINER_FULL_PREFIX = "yarn_task_";
 
   private MesosExecutorDriver driver;
   private Thread myriadExecutorThread;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/79ba4a5f/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java
index 8ff10e3..014516d 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java
@@ -53,7 +53,6 @@ public class MyriadDriver {
   }
 
   public Status kill(final TaskID taskId) {
-    LOGGER.info("Killing task {}", taskId);
     Status status = driver.killTask(taskId);
     LOGGER.info("Task {} killed with status: {}", taskId, status);
     return status;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/79ba4a5f/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ResourceUtils.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ResourceUtils.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ResourceUtils.java
new file mode 100644
index 0000000..13f93fe
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ResourceUtils.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+/**
+ * Small class of Yarn resource utils.  Some methods may be redundant with 
methods in
+ * org.apache.hadoop.yarn.util.resource.Resources as of 2.7.0 but are here for 
backwards compatibilty
+ * with 2.6.0
+ */
+public class ResourceUtils {
+  public static Resource componentwiseMax(Resource lhs, Resource rhs) {
+    int cores = Math.max(lhs.getVirtualCores(), rhs.getVirtualCores());
+    int mem = Math.max(lhs.getMemory(), rhs.getMemory());
+    return Resource.newInstance(cores, mem);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/79ba4a5f/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
index 15f4b47..1a5d185 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
@@ -25,14 +25,19 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import javax.inject.Inject;
+
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@@ -43,6 +48,7 @@ import org.apache.mesos.Protos;
 import org.apache.myriad.configuration.NodeManagerConfiguration;
 import org.apache.myriad.executor.ContainerTaskStatusRequest;
 import org.apache.myriad.scheduler.MyriadDriver;
+import org.apache.myriad.scheduler.ResourceUtils;
 import org.apache.myriad.scheduler.SchedulerUtils;
 import org.apache.myriad.scheduler.TaskUtils;
 import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor;
@@ -64,14 +70,15 @@ import org.slf4j.LoggerFactory;
  */
 public class YarnNodeCapacityManager extends BaseInterceptor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(YarnNodeCapacityManager.class);
-
   private final AbstractYarnScheduler yarnScheduler;
   private final RMContext rmContext;
   private final MyriadDriver myriadDriver;
   private final OfferLifecycleManager offerLifecycleMgr;
   private final NodeStore nodeStore;
   private final SchedulerState state;
+  private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
   private TaskUtils taskUtils;
+
   @Inject
   public YarnNodeCapacityManager(InterceptorRegistry registry, 
AbstractYarnScheduler yarnScheduler, RMContext rmContext,
                                  MyriadDriver myriadDriver, 
OfferLifecycleManager offerLifecycleMgr, NodeStore nodeStore,
@@ -98,6 +105,44 @@ public class YarnNodeCapacityManager extends 
BaseInterceptor {
     };
   }
 
+  private Protos.TaskID containerToTaskId(RMContainer container) {
+    return Protos.TaskID.newBuilder().setValue("yarn_" + 
container.getContainerId()).build();
+  }
+
+  @Override
+  public void beforeReleaseContainers(List<ContainerId> containerIds, 
SchedulerApplicationAttempt attempt) {
+    //NOOP beforeCompletedContainer does this
+  }
+
+  @Override
+  public void beforeCompletedContainer(RMContainer rmContainer, 
ContainerStatus containerStatus, RMContainerEventType type) {
+    if (type.equals(RMContainerEventType.KILL) || 
type.equals(RMContainerEventType.RELEASED)) {
+      LOGGER.info("{} completed with exit status {}, killing cooresponding 
mesos task.", rmContainer.getContainerId().toString(), type);
+      removeYarnTask(rmContainer);
+    }
+  }
+
+  private synchronized void removeYarnTask(RMContainer rmContainer) {
+    if (rmContainer != null && rmContainer.getContainer() != null) {
+      Protos.TaskID taskId = containerToTaskId(rmContainer);
+      //TODO (darinj) Reliable messaging
+      state.makeTaskKillable(taskId);
+      myriadDriver.kill(taskId);
+      String hostname = rmContainer.getContainer().getNodeId().getHost();
+      Node node = nodeStore.getNode(hostname);
+      if (node != null) {
+        RMNode rmNode = node.getNode().getRMNode();
+        Resource resource = rmContainer.getContainer().getResource();
+        Resource diff = ResourceUtils.componentwiseMax(ZERO_RESOURCE, 
Resources.subtract(rmNode.getTotalCapability(), resource));
+        setNodeCapacity(rmNode, diff);
+        LOGGER.info("Removed task yarn_{} with exit status freeing {} cpu and 
{} mem.", rmContainer.getContainer().toString(),
+            rmContainer.getContainerExitStatus(), resource.getVirtualCores(), 
resource.getMemory());
+      } else {
+        LOGGER.warn(hostname + " not found");
+      }
+    }
+  }
+
   @Override
   public void afterSchedulerEventHandled(SchedulerEvent event) {
     switch (event.getType()) {
@@ -196,13 +241,16 @@ public class YarnNodeCapacityManager extends 
BaseInterceptor {
    */
   @SuppressWarnings("unchecked")
   public void setNodeCapacity(RMNode rmNode, Resource newCapacity) {
-    rmNode.getTotalCapability().setMemory(newCapacity.getMemory());
-    rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores());
-    LOGGER.debug("Setting capacity for node {} to {}", rmNode.getHostName(), 
newCapacity);
-    // updates the scheduler with the new capacity for the NM.
-    // the event is handled by the scheduler asynchronously
-    rmContext.getDispatcher().getEventHandler().handle(new 
NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption.newInstance(
-        rmNode.getTotalCapability(), 
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
+    //NOOP prevent YARN warning changing to same size
+    if (!Resources.equals(rmNode.getTotalCapability(), newCapacity)) {
+      rmNode.getTotalCapability().setMemory(newCapacity.getMemory());
+      
rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores());
+      LOGGER.debug("Setting capacity for node {} to {}", rmNode.getHostName(), 
newCapacity);
+      // updates the scheduler with the new capacity for the NM.
+      // the event is handled by the scheduler asynchronously
+      rmContext.getDispatcher().getEventHandler().handle(new 
NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption.newInstance(
+          rmNode.getTotalCapability(), 
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
+    }
   }
 
   private Protos.TaskInfo getTaskInfoForContainer(RMContainer rmContainer, 
ConsumedOffer consumedOffer, Node node) {

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/79ba4a5f/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
index a4b2056..9069c1a 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
@@ -19,14 +19,21 @@
 package org.apache.myriad.scheduler.yarn;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor;
 import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor;
 
+import java.util.List;
+
 /**
  * {@link MyriadFairScheduler} just extends YARN's {@link FairScheduler} and
  * allows some of the {@link FairScheduler} methods to be intercepted
@@ -65,6 +72,19 @@ public class MyriadFairScheduler extends FairScheduler {
    */
 
   @Override
+  protected void releaseContainers(List<ContainerId> containers, 
SchedulerApplicationAttempt attempt) {
+    yarnSchedulerInterceptor.beforeReleaseContainers(containers, attempt);
+    super.releaseContainers(containers, attempt);
+  }
+
+  @Override
+  public void completedContainer(RMContainer rmContainer, ContainerStatus 
containerStatus, RMContainerEventType event) {
+    yarnSchedulerInterceptor.beforeCompletedContainer(rmContainer, 
containerStatus, event);
+    super.completedContainer(rmContainer, containerStatus, event);
+  }
+
+
+  @Override
   public synchronized void serviceInit(Configuration conf) throws Exception {
     this.conf = conf;
     super.serviceInit(conf);

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/79ba4a5f/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
index 50b5b03..64e9158 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
@@ -19,11 +19,18 @@
 package org.apache.myriad.scheduler.yarn.interceptor;
 
 import java.io.IOException;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 
 /**
@@ -47,6 +54,14 @@ public class BaseInterceptor implements 
YarnSchedulerInterceptor {
   }
 
   @Override
+  public void beforeReleaseContainers(List<ContainerId> containers, 
SchedulerApplicationAttempt attempt){
+  }
+
+  @Override
+  public void beforeCompletedContainer(RMContainer rmContainer, 
ContainerStatus containerStatus, RMContainerEventType event) {
+  }
+
+  @Override
   public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, 
RMContext rmContext) throws IOException {
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/79ba4a5f/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
index 6ac7af7..6c05d9b 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
@@ -21,12 +21,19 @@ package org.apache.myriad.scheduler.yarn.interceptor;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@@ -77,6 +84,35 @@ public class CompositeInterceptor implements 
YarnSchedulerInterceptor, Intercept
     };
   }
 
+  @Override
+  public void beforeReleaseContainers(List<ContainerId> containers, 
SchedulerApplicationAttempt attempt){
+    if (containers != null && attempt != null) {
+      for (YarnSchedulerInterceptor interceptor : interceptors.values()) {
+        List<ContainerId> filteredContainers = new ArrayList<>();
+        for (ContainerId containerId: containers) {
+          NodeId nodeId = 
attempt.getRMContainer(containerId).getContainer().getNodeId();
+          if ((nodeId != null && 
interceptor.getCallBackFilter().allowCallBacksForNode(nodeId))) {
+            filteredContainers.add(containerId);
+          }
+        }
+        if (!filteredContainers.isEmpty()) {
+          interceptor.beforeReleaseContainers(filteredContainers, attempt);
+        }
+      }
+    }
+  }
+  @Override
+  public void beforeCompletedContainer(RMContainer rmContainer, 
ContainerStatus containerStatus, RMContainerEventType event) {
+    if (rmContainer != null && rmContainer.getContainer() != null) {
+      NodeId nodeId = rmContainer.getContainer().getNodeId();
+      for (YarnSchedulerInterceptor interceptor : interceptors.values()) {
+        if (interceptor.getCallBackFilter().allowCallBacksForNode(nodeId)) {
+          interceptor.beforeCompletedContainer(rmContainer, containerStatus, 
event);
+        }
+      }
+    }
+  }
+
   /**
    * Allows myriad to be initialized via {@link #myriadInitInterceptor}. After 
myriad is initialized,
    * other interceptors will later register with this class via

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/79ba4a5f/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java
index 2cda0d3..71f1a3d 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java
@@ -19,12 +19,19 @@
 package org.apache.myriad.scheduler.yarn.interceptor;
 
 import java.io.IOException;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 
@@ -57,13 +64,28 @@ public interface YarnSchedulerInterceptor {
   public CallBackFilter getCallBackFilter();
 
   /**
-   * Invoked *before* {@link AbstractYarnScheduler#reinitialize(Configuration, 
RMContext)}
-   *
-   * @param conf
-   * @param yarnScheduler
-   * @param rmContext
-   * @throws IOException
+   * Invoked *before* {@link 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler#releaseContainers(List,
+   * SchedulerApplicationAttempt)}
+   * only if {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true.
+  */
+
+  public void beforeReleaseContainers(List<ContainerId> containers, 
SchedulerApplicationAttempt attempt);
+
+  /**
+   * Invoked *before* {@link 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler#completedContainer(RMContainer,
+   * ContainerStatus, RMContainerEventType)}
+   * only if {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true.
    */
+  public void beforeCompletedContainer(RMContainer rmContainer, 
ContainerStatus containerStatus, RMContainerEventType event);
+
+    /**
+     * Invoked *before* {@link 
AbstractYarnScheduler#reinitialize(Configuration, RMContext)}
+     *
+     * @param conf
+     * @param yarnScheduler
+     * @param rmContext
+     * @throws IOException
+     */
   public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, 
RMContext rmContext) throws IOException;
 
   /**

Reply via email to