http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
new file mode 100644
index 0000000..b61496f
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.event.handlers;
+
+import com.google.common.collect.Sets;
+import com.lmax.disruptor.EventHandler;
+
+import java.util.Iterator;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.Value;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * handles and logs resource offers events
+ */
+public class ResourceOffersEventHandler implements 
EventHandler<org.apache.myriad.scheduler.event.ResourceOffersEvent> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResourceOffersEventHandler.class);
+
+  private static final Lock driverOperationLock = new ReentrantLock();
+
+  private static final String RESOURCES_CPU_KEY = "cpus";
+  private static final String RESOURCES_MEM_KEY = "mem";
+  private static final String RESOURCES_PORTS_KEY = "ports";
+  private static final String RESOURCES_DISK_KEY = "disk";
+
+
+  @Inject
+  private org.apache.myriad.state.SchedulerState schedulerState;
+
+  @Inject
+  private org.apache.myriad.scheduler.TaskUtils taskUtils;
+
+  @Inject
+  private Map<String, org.apache.myriad.scheduler.TaskFactory> taskFactoryMap;
+
+  @Inject
+  private org.apache.myriad.scheduler.fgs.OfferLifecycleManager 
offerLifecycleMgr;
+
+  @Inject
+  private org.apache.myriad.scheduler.TaskConstraintsManager 
taskConstraintsManager;
+
+  @Override
+  public void onEvent(org.apache.myriad.scheduler.event.ResourceOffersEvent 
event, long sequence, boolean endOfBatch) throws Exception {
+    SchedulerDriver driver = event.getDriver();
+    List<Offer> offers = event.getOffers();
+
+    // Sometimes, we see that mesos sends resource offers before Myriad 
receives
+    // a notification for "framework registration". This is a simple defensive 
code
+    // to not process any offers unless Myriad receives a "framework 
registered" notification.
+    if (schedulerState.getFrameworkID() == null) {
+      LOGGER.warn("Received {} offers, but declining them since Framework ID 
is not yet set", offers.size());
+      for (Offer offer : offers) {
+        driver.declineOffer(offer.getId());
+      }
+      return;
+    }
+    LOGGER.info("Received offers {}", offers.size());
+    LOGGER.debug("Pending tasks: {}", this.schedulerState.getPendingTaskIds());
+    driverOperationLock.lock();
+    try {
+      for (Iterator<Offer> iterator = offers.iterator(); iterator.hasNext(); ) 
{
+        Offer offer = iterator.next();
+        Set<org.apache.myriad.state.NodeTask> nodeTasks = 
schedulerState.getNodeTasks(offer.getSlaveId());
+        for (org.apache.myriad.state.NodeTask nodeTask : nodeTasks) {
+          nodeTask.setSlaveAttributes(offer.getAttributesList());
+        }
+        // keep this in case SchedulerState gets out of sync. This should not 
happen with 
+        // synchronizing addNodes method in SchedulerState
+        // but to keep it safe
+        final Set<Protos.TaskID> missingTasks = Sets.newHashSet();
+        Set<Protos.TaskID> pendingTasks = schedulerState.getPendingTaskIds();
+        if (CollectionUtils.isNotEmpty(pendingTasks)) {
+          for (Protos.TaskID pendingTaskId : pendingTasks) {
+            org.apache.myriad.state.NodeTask taskToLaunch = 
schedulerState.getTask(pendingTaskId);
+            if (taskToLaunch == null) {
+              missingTasks.add(pendingTaskId);
+              LOGGER.warn("Node task for TaskID: {} does not exist", 
pendingTaskId);
+              continue;
+            }
+            String taskPrefix = taskToLaunch.getTaskPrefix();
+            org.apache.myriad.scheduler.ServiceResourceProfile profile = 
taskToLaunch.getProfile();
+            org.apache.myriad.scheduler.constraints.Constraint constraint = 
taskToLaunch.getConstraint();
+
+            Set<org.apache.myriad.state.NodeTask> launchedTasks = new 
HashSet<>();
+            
launchedTasks.addAll(schedulerState.getActiveTasksByType(taskPrefix));
+            
launchedTasks.addAll(schedulerState.getStagingTasksByType(taskPrefix));
+
+            if (matches(offer, taskToLaunch, constraint) && 
org.apache.myriad.scheduler.SchedulerUtils.isUniqueHostname(offer, 
taskToLaunch, launchedTasks)) {
+              try {
+                final TaskInfo task = 
taskFactoryMap.get(taskPrefix).createTask(offer, 
schedulerState.getFrameworkID(), pendingTaskId, taskToLaunch);
+                List<OfferID> offerIds = new ArrayList<>();
+                offerIds.add(offer.getId());
+                List<TaskInfo> tasks = new ArrayList<>();
+                tasks.add(task);
+                LOGGER.info("Launching task: {} using offer: {}", 
task.getTaskId().getValue(), offer.getId());
+                LOGGER.debug("Launching task: {} with profile: {} using offer: 
{}", task, profile, offer);
+                driver.launchTasks(offerIds, tasks);
+                schedulerState.makeTaskStaging(pendingTaskId);
+
+                // For every NM Task that we launch, we currently
+                // need to backup the ExecutorInfo for that NM Task in the 
State Store.
+                // Without this, we will not be able to launch tasks 
corresponding to yarn
+                // containers. This is specially important in case the RM 
restarts.
+                taskToLaunch.setExecutorInfo(task.getExecutor());
+                taskToLaunch.setHostname(offer.getHostname());
+                taskToLaunch.setSlaveId(offer.getSlaveId());
+                schedulerState.addTask(pendingTaskId, taskToLaunch);
+                iterator.remove(); // remove the used offer from offers list
+                break;
+              } catch (Throwable t) {
+                LOGGER.error("Exception thrown while trying to create a task 
for {}", taskPrefix, t);
+              }
+            }
+          }
+          for (Protos.TaskID taskId : missingTasks) {
+            schedulerState.removeTask(taskId);
+          }
+        }
+      }
+
+      for (Offer offer : offers) {
+        if 
(org.apache.myriad.scheduler.SchedulerUtils.isEligibleForFineGrainedScaling(offer.getHostname(),
 schedulerState)) {
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Picking an offer from slave with hostname {} for 
fine grained scaling.", offer.getHostname());
+          }
+          offerLifecycleMgr.addOffers(offer);
+        } else {
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Declining offer {} from slave {}.", offer, 
offer.getHostname());
+          }
+          driver.declineOffer(offer.getId());
+        }
+      }
+    } finally {
+      driverOperationLock.unlock();
+    }
+  }
+
+  private boolean matches(Offer offer, org.apache.myriad.state.NodeTask 
taskToLaunch, org.apache.myriad.scheduler.constraints.Constraint constraint) {
+    if (!meetsConstraint(offer, constraint)) {
+      return false;
+    }
+    Map<String, Object> results = new HashMap<String, Object>(5);
+    //Assign default values to avoid NPE
+    results.put(RESOURCES_CPU_KEY, Double.valueOf(0.0));
+    results.put(RESOURCES_MEM_KEY, Double.valueOf(0.0));
+    results.put(RESOURCES_DISK_KEY, Double.valueOf(0.0));
+    results.put(RESOURCES_PORTS_KEY, Integer.valueOf(0));
+
+    for (Resource resource : offer.getResourcesList()) {
+      if (resourceEvaluators.containsKey(resource.getName())) {
+        resourceEvaluators.get(resource.getName()).eval(resource, results);
+      } else {
+        LOGGER.warn("Ignoring unknown resource type: {}", resource.getName());
+      }
+    }
+    double cpus = (Double) results.get(RESOURCES_CPU_KEY);
+    double mem = (Double) results.get(RESOURCES_MEM_KEY);
+    int ports = (Integer) results.get(RESOURCES_PORTS_KEY);
+
+    checkResource(cpus <= 0, RESOURCES_CPU_KEY);
+    checkResource(mem <= 0, RESOURCES_MEM_KEY);
+    checkResource(ports <= 0, RESOURCES_PORTS_KEY);
+
+    return checkAggregates(offer, taskToLaunch, ports, cpus, mem);
+  }
+
+  private boolean checkAggregates(Offer offer, 
org.apache.myriad.state.NodeTask taskToLaunch, int ports, double cpus, double 
mem) {
+    final org.apache.myriad.scheduler.ServiceResourceProfile profile = 
taskToLaunch.getProfile();
+    final String taskPrefix = taskToLaunch.getTaskPrefix();
+    final double aggrCpu = profile.getAggregateCpu() + 
profile.getExecutorCpu();
+    final double aggrMem = profile.getAggregateMemory() + 
profile.getExecutorMemory();
+    final org.apache.myriad.scheduler.TaskConstraints taskConstraints = 
taskConstraintsManager.getConstraints(taskPrefix);
+    if (aggrCpu <= cpus && aggrMem <= mem && taskConstraints.portsCount() <= 
ports) {
+      return true;
+    } else {
+      LOGGER.info("Offer not sufficient for task with, cpu: {}, memory: {}, 
ports: {}", aggrCpu, aggrMem, ports);
+      return false;
+    }
+  }
+
+  private boolean meetsConstraint(Offer offer, 
org.apache.myriad.scheduler.constraints.Constraint constraint) {
+    if (constraint != null) {
+      switch (constraint.getType()) {
+        case LIKE: {
+          org.apache.myriad.scheduler.constraints.LikeConstraint 
likeConstraint = (org.apache.myriad.scheduler.constraints.LikeConstraint) 
constraint;
+          if (likeConstraint.isConstraintOnHostName()) {
+            return likeConstraint.matchesHostName(offer.getHostname());
+          } else {
+            return 
likeConstraint.matchesSlaveAttributes(offer.getAttributesList());
+          }
+        }
+        default:
+          return false;
+      }
+    }
+    return true;
+  }
+
+  private void checkResource(boolean fail, String resource) {
+    if (fail) {
+      LOGGER.info("No " + resource + " resources present");
+    }
+  }
+
+  private static Double scalarToDouble(Resource resource, String id) {
+    Double value = new Double(0.0);
+    if (resource.getType().equals(Value.Type.SCALAR)) {
+      value = new Double(resource.getScalar().getValue());
+    } else {
+      LOGGER.error(id + " resource was not a scalar: {}", 
resource.getType().toString());
+    }
+    return value;
+  }
+
+  private interface EvalResources {
+    public void eval(Resource resource, Map<String, Object> results);
+  }
+
+  private static Map<String, EvalResources> resourceEvaluators;
+
+  static {
+    resourceEvaluators = new HashMap<String, EvalResources>(4);
+    resourceEvaluators.put(RESOURCES_CPU_KEY, new EvalResources() {
+      public void eval(Resource resource, Map<String, Object> results) {
+        results.put(RESOURCES_CPU_KEY, (Double) results.get(RESOURCES_CPU_KEY) 
+ scalarToDouble(resource, RESOURCES_CPU_KEY));
+      }
+    });
+    resourceEvaluators.put(RESOURCES_MEM_KEY, new EvalResources() {
+      public void eval(Resource resource, Map<String, Object> results) {
+        results.put(RESOURCES_MEM_KEY, (Double) results.get(RESOURCES_MEM_KEY) 
+ scalarToDouble(resource, RESOURCES_MEM_KEY));
+      }
+    });
+    resourceEvaluators.put(RESOURCES_DISK_KEY, new EvalResources() {
+      public void eval(Resource resource, Map<String, Object> results) {
+      }
+    });
+    resourceEvaluators.put(RESOURCES_PORTS_KEY, new EvalResources() {
+      public void eval(Resource resource, Map<String, Object> results) {
+        int ports = 0;
+        if (resource.getType().equals(Value.Type.RANGES)) {
+          Value.Ranges ranges = resource.getRanges();
+          for (Value.Range range : ranges.getRangeList()) {
+            if (range.getBegin() < range.getEnd()) {
+              ports += range.getEnd() - range.getBegin() + 1;
+            }
+          }
+        } else {
+          LOGGER.error("ports resource was not Ranges: {}", 
resource.getType().toString());
+
+        }
+        results.put(RESOURCES_PORTS_KEY, (Integer) 
results.get(RESOURCES_PORTS_KEY) + Integer.valueOf(ports));
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/SlaveLostEventHandler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/SlaveLostEventHandler.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/SlaveLostEventHandler.java
new file mode 100644
index 0000000..6feebe3
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/SlaveLostEventHandler.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.event.handlers;
+
+import org.apache.myriad.scheduler.event.SlaveLostEvent;
+import com.lmax.disruptor.EventHandler;
+import org.apache.mesos.Protos.SlaveID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * handles and logs mesos slave lost events
+ */
+public class SlaveLostEventHandler implements EventHandler<SlaveLostEvent> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SlaveLostEventHandler.class);
+
+  @Override
+  public void onEvent(SlaveLostEvent event, long sequence, boolean endOfBatch) 
throws Exception {
+    SlaveID slaveId = event.getSlaveId();
+    LOGGER.info("Slave {} lost!", slaveId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
new file mode 100644
index 0000000..29c89c3
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.event.handlers;
+
+import org.apache.myriad.scheduler.event.StatusUpdateEvent;
+import org.apache.myriad.scheduler.fgs.OfferLifecycleManager;
+import org.apache.myriad.state.NodeTask;
+import org.apache.myriad.state.SchedulerState;
+import com.lmax.disruptor.EventHandler;
+
+import javax.inject.Inject;
+
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskState;
+import org.apache.mesos.Protos.TaskStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * handles and logs mesos status update events
+ */
+public class StatusUpdateEventHandler implements 
EventHandler<StatusUpdateEvent> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StatusUpdateEventHandler.class);
+
+  private final SchedulerState schedulerState;
+  private final OfferLifecycleManager offerLifecycleManager;
+
+  @Inject
+  public StatusUpdateEventHandler(SchedulerState schedulerState, 
OfferLifecycleManager offerLifecycleManager) {
+    this.schedulerState = schedulerState;
+    this.offerLifecycleManager = offerLifecycleManager;
+  }
+
+  @Override
+  public void onEvent(StatusUpdateEvent event, long sequence, boolean 
endOfBatch) throws Exception {
+    TaskStatus status = event.getStatus();
+    this.schedulerState.updateTask(status);
+    TaskID taskId = status.getTaskId();
+    NodeTask task = schedulerState.getTask(taskId);
+    if (task == null) {
+      LOGGER.warn("Task: {} not found, status: {}", taskId.getValue(), 
status.getState());
+      schedulerState.removeTask(taskId);
+      return;
+    }
+    LOGGER.info("Status Update for task: {} | state: {}", taskId.getValue(), 
status.getState());
+    TaskState state = status.getState();
+
+    switch (state) {
+      case TASK_STAGING:
+        schedulerState.makeTaskStaging(taskId);
+        break;
+      case TASK_STARTING:
+        schedulerState.makeTaskStaging(taskId);
+        break;
+      case TASK_RUNNING:
+        schedulerState.makeTaskActive(taskId);
+        break;
+      case TASK_FINISHED:
+        offerLifecycleManager.declineOutstandingOffers(task.getHostname());
+        schedulerState.removeTask(taskId);
+        break;
+      case TASK_FAILED:
+        // Add to pending tasks
+        offerLifecycleManager.declineOutstandingOffers(task.getHostname());
+        schedulerState.makeTaskPending(taskId);
+        break;
+      case TASK_KILLED:
+        offerLifecycleManager.declineOutstandingOffers(task.getHostname());
+        schedulerState.removeTask(taskId);
+        break;
+      case TASK_LOST:
+        offerLifecycleManager.declineOutstandingOffers(task.getHostname());
+        schedulerState.makeTaskPending(taskId);
+        break;
+      default:
+        LOGGER.error("Invalid state: {}", state);
+        break;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/ConsumedOffer.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/ConsumedOffer.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/ConsumedOffer.java
new file mode 100644
index 0000000..ff49496
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/ConsumedOffer.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.fgs;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.mesos.Protos;
+
+/**
+ * Represents offers from a slave that have been consumed by Myriad.
+ */
+public class ConsumedOffer {
+  private List<Protos.Offer> offers;
+
+  public ConsumedOffer() {
+    this.offers = new LinkedList<>();
+  }
+
+  public void add(Protos.Offer offer) {
+    offers.add(offer);
+  }
+
+  public List<Protos.Offer> getOffers() {
+    return offers;
+  }
+
+  public Collection<Protos.OfferID> getOfferIds() {
+    Collection<Protos.OfferID> ids = new ArrayList<>(offers.size());
+
+    for (Protos.Offer offer : offers) {
+      ids.add(offer.getId());
+    }
+
+    return ids;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java
new file mode 100644
index 0000000..b8d8326
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.fgs;
+
+import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor;
+import org.apache.myriad.state.SchedulerState;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.List;
+import javax.inject.Inject;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.Offer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles node manager heartbeat.
+ */
+public class NMHeartBeatHandler extends BaseInterceptor {
+  @VisibleForTesting
+  Logger logger = LoggerFactory.getLogger(NMHeartBeatHandler.class);
+
+  private final AbstractYarnScheduler yarnScheduler;
+  private final org.apache.myriad.scheduler.MyriadDriver myriadDriver;
+  private final YarnNodeCapacityManager yarnNodeCapacityMgr;
+  private final OfferLifecycleManager offerLifecycleMgr;
+  private final NodeStore nodeStore;
+  private final SchedulerState state;
+
+  @Inject
+  public 
NMHeartBeatHandler(org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry
 registry, AbstractYarnScheduler yarnScheduler, 
org.apache.myriad.scheduler.MyriadDriver myriadDriver, YarnNodeCapacityManager 
yarnNodeCapacityMgr,
+      OfferLifecycleManager offerLifecycleMgr, NodeStore nodeStore, 
SchedulerState state) {
+
+    if (registry != null) {
+      registry.register(this);
+    }
+
+    this.yarnScheduler = yarnScheduler;
+    this.myriadDriver = myriadDriver;
+    this.yarnNodeCapacityMgr = yarnNodeCapacityMgr;
+    this.offerLifecycleMgr = offerLifecycleMgr;
+    this.nodeStore = nodeStore;
+    this.state = state;
+  }
+
+  @Override
+  public CallBackFilter getCallBackFilter() {
+    return new CallBackFilter() {
+      @Override
+      public boolean allowCallBacksForNode(NodeId nodeManager) {
+        return 
org.apache.myriad.scheduler.SchedulerUtils.isEligibleForFineGrainedScaling(nodeManager.getHost(),
 state);
+      }
+    };
+  }
+
+  @Override
+  public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) {
+    switch (event.getType()) {
+      case STARTED:
+        RMNode rmNode = context.getRMNodes().get(event.getNodeId());
+        Resource totalCapability = rmNode.getTotalCapability();
+        if (totalCapability.getMemory() != 0 || 
totalCapability.getVirtualCores() != 0) {
+          logger.warn("FineGrainedScaling feature got invoked for a " + "NM 
with non-zero capacity. Host: {}, Mem: {}, CPU: {}. Setting the NM's capacity 
to (0G,0CPU)", rmNode.getHostName(), totalCapability.getMemory(), 
totalCapability
+              .getVirtualCores());
+          totalCapability.setMemory(0);
+          totalCapability.setVirtualCores(0);
+        }
+        break;
+
+      case STATUS_UPDATE:
+        handleStatusUpdate(event, context);
+        break;
+
+      default:
+        break;
+    }
+  }
+
+  @VisibleForTesting
+  protected void handleStatusUpdate(RMNodeEvent event, RMContext context) {
+    if (!(event instanceof RMNodeStatusEvent)) {
+      logger.error("{} not an instance of {}", event.getClass().getName(), 
RMNodeStatusEvent.class.getName());
+      return;
+    }
+
+    RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
+    RMNode rmNode = context.getRMNodes().get(event.getNodeId());
+    String hostName = rmNode.getNodeID().getHost();
+
+    Node host = nodeStore.getNode(hostName);
+    if (host != null) {
+      host.snapshotRunningContainers();
+    }
+
+    // New capacity of the node =
+    // resources under use on the node (due to previous offers) +
+    // new resources offered by mesos for the node
+    yarnNodeCapacityMgr.setNodeCapacity(rmNode, 
Resources.add(getResourcesUnderUse(statusEvent), 
getNewResourcesOfferedByMesos(hostName)));
+  }
+
+  private Resource getNewResourcesOfferedByMesos(String hostname) {
+    OfferFeed feed = offerLifecycleMgr.getOfferFeed(hostname);
+    if (feed == null) {
+      logger.debug("No offer feed for: {}", hostname);
+      return Resource.newInstance(0, 0);
+    }
+    List<Offer> offers = new ArrayList<>();
+    Protos.Offer offer;
+    while ((offer = feed.poll()) != null) {
+      offers.add(offer);
+      offerLifecycleMgr.markAsConsumed(offer);
+    }
+    Resource fromMesosOffers = 
OfferUtils.getYarnResourcesFromMesosOffers(offers);
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("NM on host {} got {} CPUs and {} memory from mesos", 
hostname, fromMesosOffers.getVirtualCores(), fromMesosOffers.getMemory());
+    }
+
+    return fromMesosOffers;
+  }
+
+  private Resource getResourcesUnderUse(RMNodeStatusEvent statusEvent) {
+    Resource usedResources = Resource.newInstance(0, 0);
+    for (ContainerStatus status : statusEvent.getContainers()) {
+      if (status.getState() == ContainerState.NEW || status.getState() == 
ContainerState.RUNNING) {
+        RMContainer rmContainer = 
yarnScheduler.getRMContainer(status.getContainerId());
+        // (sdaingade) This check is needed as RMContainer information may not 
be populated
+        // immediately after a RM restart.
+        if (rmContainer != null) {
+          Resources.addTo(usedResources, rmContainer.getAllocatedResource());
+        }
+      }
+    }
+    return usedResources;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/Node.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/Node.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/Node.java
new file mode 100644
index 0000000..035953e
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/Node.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.fgs;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.mesos.Protos;
+
+/**
+ * Abstraction that encapsulates YARN and Mesos view of a node.
+ */
+public class Node {
+  /**
+   * Mesos slave id associated with this node.
+   */
+  private Protos.SlaveID slaveId;
+
+  /**
+   * Mesos executor on this node.
+   */
+  private Protos.ExecutorInfo execInfo;
+
+  /**
+   * YARN scheduler's representation of this node.
+   */
+  private SchedulerNode node;
+
+  /**
+   * Snapshot of containers allocated by YARN scheduler.
+   * This need not reflect the current state. It is meant to be used by the
+   * Myriad scheduler.
+   */
+  private Set<RMContainer> containerSnapshot;
+
+  public Node(SchedulerNode node) {
+    this.node = node;
+  }
+
+  public SchedulerNode getNode() {
+    return node;
+  }
+
+  public Protos.SlaveID getSlaveId() {
+    return slaveId;
+  }
+
+  public void setSlaveId(Protos.SlaveID slaveId) {
+    this.slaveId = slaveId;
+  }
+
+  public Protos.ExecutorInfo getExecInfo() {
+    return execInfo;
+  }
+
+  public void setExecInfo(Protos.ExecutorInfo execInfo) {
+    this.execInfo = execInfo;
+  }
+
+  public void snapshotRunningContainers() {
+    this.containerSnapshot = new HashSet<>(node.getRunningContainers());
+  }
+
+  public void removeContainerSnapshot() {
+    this.containerSnapshot = null;
+  }
+
+  public Set<RMContainer> getContainerSnapshot() {
+    return this.containerSnapshot;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NodeStore.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NodeStore.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NodeStore.java
new file mode 100644
index 0000000..a940a0e
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NodeStore.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.fgs;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+/**
+ * A store for all Node instances managed by this Myriad instance.
+ */
+public class NodeStore {
+  private ConcurrentHashMap<String, Node> nodeMap;
+
+  public NodeStore() {
+    nodeMap = new ConcurrentHashMap<>(200, 0.75f, 50);
+  }
+
+  private String getKey(SchedulerNode schedNode) {
+    return schedNode.getNodeID().getHost();
+  }
+
+  public void add(SchedulerNode schedNode) {
+    nodeMap.put(getKey(schedNode), new Node(schedNode));
+  }
+
+  public void remove(String hostname) {
+    nodeMap.remove(hostname);
+  }
+
+  public Node getNode(String hostname) {
+    return nodeMap.get(hostname);
+  }
+
+  public boolean isPresent(String hostname) {
+    return nodeMap.containsKey(hostname);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferFeed.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferFeed.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferFeed.java
new file mode 100644
index 0000000..521ea57
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferFeed.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.fgs;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.mesos.Protos;
+
+/**
+ * Feed of Mesos offers for a node.
+ */
+public class OfferFeed {
+  private ConcurrentLinkedQueue<Protos.Offer> queue;
+
+  public OfferFeed() {
+    this.queue = new ConcurrentLinkedQueue<>();
+  }
+
+  public void add(Protos.Offer offer) {
+    queue.add(offer);
+  }
+
+  /**
+   * Retrieves and removes the head of the feed, or returns NULL if the feed is
+   * empty.
+   */
+  public Protos.Offer poll() {
+    return queue.poll();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java
new file mode 100644
index 0000000..0e283cb
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.fgs;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.inject.Inject;
+
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.Offer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the Mesos offers tracked by Myriad.
+ */
+public class OfferLifecycleManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(OfferLifecycleManager.class);
+
+  private Map<String, OfferFeed> offerFeedMap;
+
+  /**
+   * !!! Not thread safe !!!
+   */
+  private final Map<String, ConsumedOffer> consumedOfferMap;
+
+  private final NodeStore nodeStore;
+  private final org.apache.myriad.scheduler.MyriadDriver myriadDriver;
+
+  @Inject
+  public OfferLifecycleManager(NodeStore nodeStore, 
org.apache.myriad.scheduler.MyriadDriver myriadDriver) {
+
+    this.offerFeedMap = new ConcurrentHashMap<>(200, 0.75f, 50);
+    this.consumedOfferMap = new HashMap<>(200, 0.75f);
+    this.nodeStore = nodeStore;
+    this.myriadDriver = myriadDriver;
+  }
+
+  public OfferFeed getOfferFeed(String hostname) {
+    return offerFeedMap.get(hostname);
+  }
+
+  public void declineOffer(Protos.Offer offer) {
+    myriadDriver.getDriver().declineOffer(offer.getId());
+    LOGGER.debug("Declined offer {}", offer.getId());
+  }
+
+  public void addOffers(Protos.Offer... offers) {
+    for (Protos.Offer offer : offers) {
+      String hostname = offer.getHostname();
+      Node node = nodeStore.getNode(hostname);
+      if (node != null) {
+        OfferFeed feed = offerFeedMap.get(hostname);
+        if (feed == null) {
+          feed = new OfferFeed();
+          offerFeedMap.put(hostname, feed);
+        }
+        feed.add(offer);
+
+        node.setSlaveId(offer.getSlaveId());
+
+        LOGGER.debug("addResourceOffers: caching offer for host {}, offer id 
{}", hostname, offer.getId().getValue());
+      } else {
+        myriadDriver.getDriver().declineOffer(offer.getId());
+        LOGGER.debug("Declined offer for unregistered host {}", hostname);
+      }
+    }
+  }
+
+  public void markAsConsumed(Protos.Offer offer) {
+    ConsumedOffer consumedOffer = consumedOfferMap.get(offer.getHostname());
+    if (consumedOffer == null) {
+      consumedOffer = new ConsumedOffer();
+      consumedOfferMap.put(offer.getHostname(), consumedOffer);
+    }
+
+    consumedOffer.add(offer);
+  }
+
+  public ConsumedOffer drainConsumedOffer(String hostname) {
+    return consumedOfferMap.remove(hostname);
+  }
+
+  public void declineOutstandingOffers(String hostname) {
+    int numOutStandingOffers = 0;
+    OfferFeed offerFeed = getOfferFeed(hostname);
+    Offer offer;
+    while (offerFeed != null && (offer = offerFeed.poll()) != null) {
+      declineOffer(offer);
+      numOutStandingOffers++;
+    }
+    if (numOutStandingOffers > 0) {
+      LOGGER.info("Declined {} outstanding offers for host {}", 
numOutStandingOffers, hostname);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferUtils.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferUtils.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferUtils.java
new file mode 100644
index 0000000..038aeb4
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferUtils.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.fgs;
+
+import java.util.Collection;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.Offer;
+
+/**
+ * Utility class that provides useful methods that deal with Mesos offers.
+ */
+public class OfferUtils {
+
+  /**
+   * Transforms a collection of mesos offers into {@link Resource}.
+   *
+   * @param offers collection of mesos offers
+   * @return a single resource object equivalent to the cumulative sum of 
mesos offers
+   */
+  public static Resource getYarnResourcesFromMesosOffers(Collection<Offer> 
offers) {
+    double cpus = 0.0;
+    double mem = 0.0;
+
+    for (Protos.Offer offer : offers) {
+      for (Protos.Resource resource : offer.getResourcesList()) {
+        if (resource.getName().equalsIgnoreCase("cpus")) {
+          cpus += resource.getScalar().getValue();
+        } else if (resource.getName().equalsIgnoreCase("mem")) {
+          mem += resource.getScalar().getValue();
+        }
+      }
+    }
+    return Resource.newInstance((int) mem, (int) cpus);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
new file mode 100644
index 0000000..ca094cc
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.fgs;
+
+import org.apache.myriad.executor.ContainerTaskStatusRequest;
+import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.inject.Inject;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the capacity exposed by NodeManager. It uses the offers available
+ * from Mesos to inflate the node capacity and lets ResourceManager make the
+ * scheduling decision. After the scheduling decision is done, there are 2 
cases:
+ * <p/>
+ * 1. If ResourceManager did not use the expanded capacity, then the node's
+ * capacity is reverted back to original value and the offer is declined.
+ * 2. If ResourceManager ended up using the expanded capacity, then the node's
+ * capacity is updated accordingly and any unused capacity is returned back to
+ * Mesos.
+ */
+public class YarnNodeCapacityManager extends BaseInterceptor {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YarnNodeCapacityManager.class);
+
+  private final AbstractYarnScheduler yarnScheduler;
+  private final RMContext rmContext;
+  private final org.apache.myriad.scheduler.MyriadDriver myriadDriver;
+  private final OfferLifecycleManager offerLifecycleMgr;
+  private final NodeStore nodeStore;
+  private final org.apache.myriad.state.SchedulerState state;
+
+  @Inject
+  public 
YarnNodeCapacityManager(org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry
 registry, AbstractYarnScheduler yarnScheduler, RMContext rmContext, 
org.apache.myriad.scheduler.MyriadDriver myriadDriver, OfferLifecycleManager
+      offerLifecycleMgr, NodeStore nodeStore, 
org.apache.myriad.state.SchedulerState state) {
+    if (registry != null) {
+      registry.register(this);
+    }
+    this.yarnScheduler = yarnScheduler;
+    this.rmContext = rmContext;
+    this.myriadDriver = myriadDriver;
+    this.offerLifecycleMgr = offerLifecycleMgr;
+    this.nodeStore = nodeStore;
+    this.state = state;
+  }
+
+  @Override
+  public CallBackFilter getCallBackFilter() {
+    return new CallBackFilter() {
+      @Override
+      public boolean allowCallBacksForNode(NodeId nodeManager) {
+        return 
org.apache.myriad.scheduler.SchedulerUtils.isEligibleForFineGrainedScaling(nodeManager.getHost(),
 state);
+      }
+    };
+  }
+
+  @Override
+  public void afterSchedulerEventHandled(SchedulerEvent event) {
+    switch (event.getType()) {
+      case NODE_ADDED: {
+        if (!(event instanceof NodeAddedSchedulerEvent)) {
+          LOGGER.error("{} not an instance of {}", event.getClass().getName(), 
NodeAddedSchedulerEvent.class.getName());
+          return;
+        }
+
+        NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) 
event;
+        NodeId nodeId = nodeAddedEvent.getAddedRMNode().getNodeID();
+        String host = nodeId.getHost();
+
+        SchedulerNode node = yarnScheduler.getSchedulerNode(nodeId);
+        nodeStore.add(node);
+        LOGGER.info("afterSchedulerEventHandled: NM registration from node 
{}", host);
+      }
+        break;
+
+      case NODE_UPDATE: {
+        if (!(event instanceof NodeUpdateSchedulerEvent)) {
+          LOGGER.error("{} not an instance of {}", event.getClass().getName(), 
NodeUpdateSchedulerEvent.class.getName());
+          return;
+        }
+
+        RMNode rmNode = ((NodeUpdateSchedulerEvent) event).getRMNode();
+        handleContainerAllocation(rmNode);
+      }
+        break;
+
+      default:
+        break;
+    }
+  }
+
+  /**
+   * Checks if any containers were allocated in the current scheduler run and
+   * launches the corresponding Mesos tasks. It also udpates the node
+   * capacity depending on what portion of the consumed offers were actually
+   * used.
+   */
+  @VisibleForTesting
+  protected void handleContainerAllocation(RMNode rmNode) {
+    String host = rmNode.getNodeID().getHost();
+
+    ConsumedOffer consumedOffer = offerLifecycleMgr.drainConsumedOffer(host);
+    if (consumedOffer == null) {
+      LOGGER.debug("No offer consumed for {}", host);
+      return;
+    }
+
+    Node node = nodeStore.getNode(host);
+    Set<RMContainer> containersBeforeSched = node.getContainerSnapshot();
+    Set<RMContainer> containersAfterSched = new 
HashSet<>(node.getNode().getRunningContainers());
+
+    Set<RMContainer> containersAllocatedByMesosOffer = (containersBeforeSched 
== null) ? containersAfterSched : Sets.difference(containersAfterSched, 
containersBeforeSched);
+
+    if (containersAllocatedByMesosOffer.isEmpty()) {
+      LOGGER.debug("No containers allocated using Mesos offers for host: {}", 
host);
+      for (Protos.Offer offer : consumedOffer.getOffers()) {
+        offerLifecycleMgr.declineOffer(offer);
+      }
+      setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), 
OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers())));
+    } else {
+      LOGGER.debug("Containers allocated using Mesos offers for host: {} 
count: {}", host, containersAllocatedByMesosOffer.size());
+
+      // Identify the Mesos tasks that need to be launched
+      List<Protos.TaskInfo> tasks = Lists.newArrayList();
+      Resource resUsed = Resource.newInstance(0, 0);
+
+      for (RMContainer newContainer : containersAllocatedByMesosOffer) {
+        tasks.add(getTaskInfoForContainer(newContainer, consumedOffer, node));
+        resUsed = Resources.add(resUsed, newContainer.getAllocatedResource());
+      }
+
+      // Reduce node capacity to account for unused offers
+      Resource resOffered = 
OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers());
+      Resource resUnused = Resources.subtract(resOffered, resUsed);
+      setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), 
resUnused));
+
+      myriadDriver.getDriver().launchTasks(consumedOffer.getOfferIds(), tasks);
+    }
+
+    // No need to hold on to the snapshot anymore
+    node.removeContainerSnapshot();
+  }
+
+  /**
+   * 1. Updates {@link RMNode#getTotalCapability()} with newCapacity.
+   * 2. Sends out a {@link NodeResourceUpdateSchedulerEvent} that's handled by 
YARN's scheduler.
+   * The scheduler updates the corresponding {@link SchedulerNode} with the 
newCapacity.
+   *
+   * @param rmNode
+   * @param newCapacity
+   */
+  @SuppressWarnings("unchecked")
+  public void setNodeCapacity(RMNode rmNode, Resource newCapacity) {
+    rmNode.getTotalCapability().setMemory(newCapacity.getMemory());
+    rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores());
+    LOGGER.debug("Setting capacity for node {} to {}", rmNode.getHostName(), 
newCapacity);
+    // updates the scheduler with the new capacity for the NM.
+    // the event is handled by the scheduler asynchronously
+    rmContext.getDispatcher().getEventHandler().handle(new 
NodeResourceUpdateSchedulerEvent(rmNode, 
ResourceOption.newInstance(rmNode.getTotalCapability(), 
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
+  }
+
+  private Protos.TaskInfo getTaskInfoForContainer(RMContainer rmContainer, 
ConsumedOffer consumedOffer, Node node) {
+
+    Protos.Offer offer = consumedOffer.getOffers().get(0);
+    Container container = rmContainer.getContainer();
+    Protos.TaskID taskId = 
Protos.TaskID.newBuilder().setValue(ContainerTaskStatusRequest.YARN_CONTAINER_TASK_ID_PREFIX
 + container.getId().toString()).build();
+
+    // TODO (sdaingade) Remove ExecutorInfo from the Node object
+    // as this is now cached in the NodeTask object in scheduler state.
+    Protos.ExecutorInfo executorInfo = node.getExecInfo();
+    if (executorInfo == null) {
+      executorInfo = 
Protos.ExecutorInfo.newBuilder(state.getNodeTask(offer.getSlaveId(), 
org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX).getExecutorInfo()).setFrameworkId(offer.getFrameworkId()).build();
+      node.setExecInfo(executorInfo);
+    }
+
+    return Protos.TaskInfo.newBuilder().setName("task_" + 
taskId.getValue()).setTaskId(taskId).setSlaveId(offer.getSlaveId()).addResources(Protos.Resource.newBuilder().setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar
+        
.newBuilder().setValue(container.getResource().getVirtualCores()))).addResources(Protos.Resource.newBuilder().setName("mem").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(container.getResource()
+        .getMemory()))).setExecutor(executorInfo).build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadCapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadCapacityScheduler.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadCapacityScheduler.java
new file mode 100644
index 0000000..f6d24e9
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadCapacityScheduler.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.yarn;
+
+import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor;
+import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+
+/**
+ * {@link MyriadCapacityScheduler} just extends YARN's {@link 
CapacityScheduler} and
+ * allows some of the {@link CapacityScheduler} methods to be intercepted
+ * via the {@link YarnSchedulerInterceptor} interface.
+ */
+public class MyriadCapacityScheduler extends CapacityScheduler {
+  private Configuration conf;
+
+  private RMContext rmContext;
+  private YarnSchedulerInterceptor yarnSchedulerInterceptor;
+  private RMNodeEventHandler rmNodeEventHandler;
+
+  public MyriadCapacityScheduler() {
+    super();
+  }
+
+  /**
+   * Register an event handler that receives {@link RMNodeEvent} events.
+   * This event handler is registered ahead of RM's own event handler for 
RMNodeEvents.
+   * For e.g. myriad can inspect a node's HB (RMNodeStatusEvent) before the HB 
is handled by
+   * RM and the scheduler.
+   *
+   * @param rmContext
+   */
+  @Override
+  public synchronized void setRMContext(RMContext rmContext) {
+    this.rmContext = rmContext;
+    this.yarnSchedulerInterceptor = new CompositeInterceptor();
+    rmNodeEventHandler = new RMNodeEventHandler(yarnSchedulerInterceptor, 
rmContext);
+    rmContext.getDispatcher().register(RMNodeEventType.class, 
rmNodeEventHandler);
+    super.setRMContext(rmContext);
+  }
+
+  /**
+   * ******** Methods overridden from YARN {@link CapacityScheduler}  
*********************
+   */
+
+  @Override
+  public synchronized void serviceInit(Configuration conf) throws Exception {
+    this.conf = conf;
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public synchronized void serviceStart() throws Exception {
+    this.yarnSchedulerInterceptor.init(conf, this, rmContext);
+    super.serviceStart();
+  }
+
+  @Override
+  public synchronized void handle(SchedulerEvent event) {
+    this.yarnSchedulerInterceptor.beforeSchedulerEventHandled(event);
+    super.handle(event);
+    this.yarnSchedulerInterceptor.afterSchedulerEventHandled(event);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
new file mode 100644
index 0000000..35d6aaf
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.yarn;
+
+import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor;
+import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+
+/**
+ * {@link MyriadFairScheduler} just extends YARN's {@link FairScheduler} and
+ * allows some of the {@link FairScheduler} methods to be intercepted
+ * via the {@link YarnSchedulerInterceptor} interface.
+ */
+public class MyriadFairScheduler extends FairScheduler {
+
+  private RMContext rmContext;
+  private YarnSchedulerInterceptor yarnSchedulerInterceptor;
+  private RMNodeEventHandler rmNodeEventHandler;
+  private Configuration conf;
+
+  public MyriadFairScheduler() {
+    super();
+  }
+
+  /**
+   * Register an event handler that receives {@link RMNodeEvent} events.
+   * This event handler is registered ahead of RM's own event handler for 
RMNodeEvents.
+   * For e.g. myriad can inspect a node's HB (RMNodeStatusEvent) before the HB 
is handled by
+   * RM and the scheduler.
+   *
+   * @param rmContext
+   */
+  @Override
+  public synchronized void setRMContext(RMContext rmContext) {
+    this.rmContext = rmContext;
+    this.yarnSchedulerInterceptor = new CompositeInterceptor();
+    rmNodeEventHandler = new RMNodeEventHandler(yarnSchedulerInterceptor, 
rmContext);
+    rmContext.getDispatcher().register(RMNodeEventType.class, 
rmNodeEventHandler);
+    super.setRMContext(rmContext);
+  }
+
+  /**
+   * ******** Methods overridden from YARN {@link FairScheduler}  
*********************
+   */
+
+  @Override
+  public synchronized void serviceInit(Configuration conf) throws Exception {
+    this.conf = conf;
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public synchronized void serviceStart() throws Exception {
+    this.yarnSchedulerInterceptor.init(conf, this, rmContext);
+    super.serviceStart();
+  }
+
+  @Override
+  public synchronized void handle(SchedulerEvent event) {
+    this.yarnSchedulerInterceptor.beforeSchedulerEventHandled(event);
+    super.handle(event);
+    this.yarnSchedulerInterceptor.afterSchedulerEventHandled(event);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFifoScheduler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFifoScheduler.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFifoScheduler.java
new file mode 100644
index 0000000..1fd3b87
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFifoScheduler.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.yarn;
+
+import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+
+/**
+ * {@link MyriadFifoScheduler} just extends YARN's {@link FifoScheduler} and
+ * allows some of the {@link FifoScheduler} methods to be intercepted
+ * via the {@link YarnSchedulerInterceptor} interface.
+ */
+public class MyriadFifoScheduler extends FifoScheduler {
+  private Configuration conf;
+
+  private RMContext rmContext;
+  private YarnSchedulerInterceptor yarnSchedulerInterceptor;
+  private RMNodeEventHandler rmNodeEventHandler;
+
+  public MyriadFifoScheduler() {
+    super();
+  }
+
+  /**
+   * Register an event handler that receives {@link RMNodeEvent} events.
+   * This event handler is registered ahead of RM's own event handler for 
RMNodeEvents.
+   * For e.g. myriad can inspect a node's HB (RMNodeStatusEvent) before the HB 
is handled by
+   * RM and the scheduler.
+   *
+   * @param rmContext
+   */
+  @Override
+  public synchronized void setRMContext(RMContext rmContext) {
+    this.rmContext = rmContext;
+    this.yarnSchedulerInterceptor = new 
org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor();
+    rmNodeEventHandler = new RMNodeEventHandler(yarnSchedulerInterceptor, 
rmContext);
+    rmContext.getDispatcher().register(RMNodeEventType.class, 
rmNodeEventHandler);
+    super.setRMContext(rmContext);
+  }
+
+  /**
+   * ******** Methods overridden from YARN {@link FifoScheduler}  
*********************
+   */
+
+  @Override
+  public synchronized void serviceInit(Configuration conf) throws Exception {
+    this.conf = conf;
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public synchronized void serviceStart() throws Exception {
+    this.yarnSchedulerInterceptor.init(conf, this, rmContext);
+    super.serviceStart();
+  }
+
+  @Override
+  public synchronized void handle(SchedulerEvent event) {
+    this.yarnSchedulerInterceptor.beforeSchedulerEventHandled(event);
+    super.handle(event);
+    this.yarnSchedulerInterceptor.afterSchedulerEventHandled(event);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/RMNodeEventHandler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/RMNodeEventHandler.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/RMNodeEventHandler.java
new file mode 100644
index 0000000..1a63553
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/RMNodeEventHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.yarn;
+
+import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+
+/**
+ * Passes the {@link RMNodeEvent} events into the {@link 
YarnSchedulerInterceptor}.
+ */
+public class RMNodeEventHandler implements EventHandler<RMNodeEvent> {
+  private final YarnSchedulerInterceptor interceptor;
+  private final RMContext rmContext;
+
+  public RMNodeEventHandler(YarnSchedulerInterceptor interceptor, RMContext 
rmContext) {
+    this.interceptor = interceptor;
+    this.rmContext = rmContext;
+  }
+
+  @Override
+  public void handle(RMNodeEvent event) {
+    interceptor.beforeRMNodeEventHandled(event, rmContext);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
new file mode 100644
index 0000000..ba4dec3
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.yarn.interceptor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+
+import java.io.IOException;
+
+/**
+ * A no-op interceptor whose sole purpose is to serve as a base class
+ * for other interceptors. Child interceptors can selectively override the
+ * required methods.
+ */
+public class BaseInterceptor implements YarnSchedulerInterceptor {
+  // restrict the constructor
+  protected BaseInterceptor() {
+  }
+
+  @Override
+  public CallBackFilter getCallBackFilter() {
+    return new CallBackFilter() {
+      @Override
+      public boolean allowCallBacksForNode(NodeId nodeManager) {
+        return true;
+      }
+    };
+  }
+
+  @Override
+  public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, 
RMContext rmContext) throws IOException {
+  }
+
+  @Override
+  public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) {
+
+  }
+
+  @Override
+  public void beforeSchedulerEventHandled(SchedulerEvent event) {
+
+  }
+
+  @Override
+  public void afterSchedulerEventHandled(SchedulerEvent event) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
new file mode 100644
index 0000000..0bf1616
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.yarn.interceptor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * An interceptor that wraps other interceptors. The 
Myriad{Fair,Capacity,Fifo}Scheduler classes
+ * instantiate this class and allow interception of the Yarn scheduler 
events/method calls.
+ * <p/>
+ * The {@link CompositeInterceptor} allows other interceptors to be registered 
via {@link InterceptorRegistry}
+ * and passes control to the registered interceptors whenever a event/method 
call is being intercepted.
+ */
+public class CompositeInterceptor implements YarnSchedulerInterceptor, 
InterceptorRegistry {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CompositeInterceptor.class);
+
+  private Map<Class<?>, YarnSchedulerInterceptor> interceptors = 
Maps.newLinkedHashMap();
+  private YarnSchedulerInterceptor myriadInitInterceptor;
+
+  /**
+   * Called by Myriad{Fair,Capacity,Fifo}Scheduler classes. Creates an 
instance of
+   * {@link MyriadInitializationInterceptor}.
+   */
+  public CompositeInterceptor() {
+    this.myriadInitInterceptor = new MyriadInitializationInterceptor(this);
+  }
+
+  @VisibleForTesting
+  public void setMyriadInitInterceptor(YarnSchedulerInterceptor 
myriadInitInterceptor) {
+    this.myriadInitInterceptor = myriadInitInterceptor;
+  }
+
+  @Override
+  public void register(YarnSchedulerInterceptor interceptor) {
+    interceptors.put(interceptor.getClass(), interceptor);
+    LOGGER.info("Registered {} into the registry.", 
interceptor.getClass().getName());
+  }
+
+  @Override
+  public CallBackFilter getCallBackFilter() {
+    return new CallBackFilter() {
+      @Override
+      public boolean allowCallBacksForNode(NodeId nodeManager) {
+        return true;
+      }
+    };
+  }
+
+  /**
+   * Allows myriad to be initialized via {@link #myriadInitInterceptor}. After 
myriad is initialized,
+   * other interceptors will later register with this class via
+   * {@link InterceptorRegistry#register(YarnSchedulerInterceptor)}.
+   *
+   * @param conf
+   * @param yarnScheduler
+   * @param rmContext
+   * @throws IOException
+   */
+  @Override
+  public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, 
RMContext rmContext) throws IOException {
+    myriadInitInterceptor.init(conf, yarnScheduler, rmContext);
+  }
+
+  @Override
+  public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) {
+    for (YarnSchedulerInterceptor interceptor : interceptors.values()) {
+      if 
(interceptor.getCallBackFilter().allowCallBacksForNode(event.getNodeId())) {
+        interceptor.beforeRMNodeEventHandled(event, context);
+      }
+    }
+  }
+
+  @Override
+  public void beforeSchedulerEventHandled(SchedulerEvent event) {
+    for (YarnSchedulerInterceptor interceptor : interceptors.values()) {
+      final NodeId nodeId = getNodeIdForSchedulerEvent(event);
+      if (nodeId != null && 
interceptor.getCallBackFilter().allowCallBacksForNode(nodeId)) {
+        interceptor.beforeSchedulerEventHandled(event);
+      }
+    }
+  }
+
+  @Override
+  public void afterSchedulerEventHandled(SchedulerEvent event) {
+    for (YarnSchedulerInterceptor interceptor : interceptors.values()) {
+      NodeId nodeId = getNodeIdForSchedulerEvent(event);
+      if (nodeId != null && 
interceptor.getCallBackFilter().allowCallBacksForNode(nodeId)) {
+        interceptor.afterSchedulerEventHandled(event);
+      }
+    }
+  }
+
+  private NodeId getNodeIdForSchedulerEvent(SchedulerEvent event) {
+    switch (event.getType()) {
+      case NODE_ADDED:
+        return ((NodeAddedSchedulerEvent) event).getAddedRMNode().getNodeID();
+      case NODE_REMOVED:
+        return ((NodeRemovedSchedulerEvent) 
event).getRemovedRMNode().getNodeID();
+      case NODE_UPDATE:
+        return ((NodeUpdateSchedulerEvent) event).getRMNode().getNodeID();
+      case NODE_RESOURCE_UPDATE:
+        return ((NodeResourceUpdateSchedulerEvent) 
event).getRMNode().getNodeID();
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java
new file mode 100644
index 0000000..e4073d3
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.yarn.interceptor;
+
+/**
+ * Allows registration of {@link YarnSchedulerInterceptor}.
+ */
+public interface InterceptorRegistry {
+
+  public void register(YarnSchedulerInterceptor interceptor);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java
new file mode 100644
index 0000000..fa3e690
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.yarn.interceptor;
+
+import org.apache.myriad.Main;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Responsible for intializing myriad.
+ */
+public class MyriadInitializationInterceptor extends BaseInterceptor {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MyriadInitializationInterceptor.class);
+
+  private final InterceptorRegistry registry;
+
+  public MyriadInitializationInterceptor(InterceptorRegistry registry) {
+    this.registry = registry;
+  }
+
+  /**
+   * Initialize Myriad plugin before RM's scheduler is initialized.
+   * This includes registration with Mesos master, initialization of
+   * the myriad web application, initializing guice modules etc.
+   */
+  @Override
+  public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, 
RMContext rmContext) throws IOException {
+    try {
+      Main.initialize(conf, yarnScheduler, rmContext, registry);
+    } catch (Exception e) {
+      // Abort bringing up RM
+      throw new RuntimeException("Failed to initialize myriad", e);
+    }
+    LOGGER.info("Initialized myriad.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java
new file mode 100644
index 0000000..92a4b0f
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.yarn.interceptor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+
+import java.io.IOException;
+
+/**
+ * Allows interception of YARN's scheduler events (or methods).
+ */
+public interface YarnSchedulerInterceptor {
+
+  /**
+   * Filters the method callbacks.
+   */
+  interface CallBackFilter {
+    /**
+     * Method to determine if any other methods in {@link 
YarnSchedulerInterceptor}
+     * pertaining to a given node manager should be invoked or not.
+     *
+     * @param nodeManager NodeId of the Node Manager registered with RM.
+     * @return true to allow invoking further interceptor methods. false 
otherwise.
+     */
+    public boolean allowCallBacksForNode(NodeId nodeManager);
+  }
+
+  /**
+   * Return an instance of {@link CallBackFilter}. {@link 
CallBackFilter#allowCallBacksForNode(NodeId)}
+   * method is invoked to *determine* if any of the other methods pertaining 
to a specific node
+   * needs to be invoked or not.
+   *
+   * @return
+   */
+  public CallBackFilter getCallBackFilter();
+
+  /**
+   * Invoked *before* {@link AbstractYarnScheduler#reinitialize(Configuration, 
RMContext)}
+   *
+   * @param conf
+   * @param yarnScheduler
+   * @param rmContext
+   * @throws IOException
+   */
+  public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, 
RMContext rmContext) throws IOException;
+
+  /**
+   * Invoked *before* {@link RMNodeImpl#handle(RMNodeEvent)} only if
+   * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true.
+   *
+   * @param event
+   * @param context
+   */
+  public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context);
+
+  /**
+   * Invoked *before* {@link 
YarnScheduler#handle(org.apache.hadoop.yarn.event.Event)} only if
+   * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true.
+   *
+   * @param event
+   */
+  public void beforeSchedulerEventHandled(SchedulerEvent event);
+
+  /**
+   * Invoked *after* {@link 
YarnScheduler#handle(org.apache.hadoop.yarn.event.Event)} only if
+   * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true.
+   *
+   * @param event
+   */
+  public void afterSchedulerEventHandled(SchedulerEvent event);
+
+}

Reply via email to