Repository: incubator-myriad
Updated Branches:
  refs/heads/master 4a6e50c41 -> 7207e2b04


MYRIAD-220 Initial check-in
Encapsulates changes to implement MYRIAD-220 along with enhanced/added comments
JIRA:
  [MYRIAD-220] https://issues.apache.org/jira/browse/MYRIAD-220
Pull Request:
  Closes #84
Author:    hokiegeek2 <hokiege...@gmail.com>
Date:      Thu Jun 30 16:03:44 2016 -0400


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/7207e2b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/7207e2b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/7207e2b0

Branch: refs/heads/master
Commit: 7207e2b04d8c9a0d74376cdeca8216fd237c960c
Parents: 4a6e50c
Author: hokiegeek2 <hokiege...@gmail.com>
Authored: Thu Jun 30 16:03:44 2016 -0400
Committer: darinj <dar...@apache.org>
Committed: Thu Jul 14 15:03:10 2016 -0400

----------------------------------------------------------------------
 .../recovery/MyriadFileSystemRMStateStore.java  |  3 +-
 .../apache/myriad/scheduler/MyriadDriver.java   | 51 ++++++++++-
 .../myriad/scheduler/MyriadScheduler.java       | 40 ++++++++-
 .../apache/myriad/scheduler/TaskTerminator.java | 92 ++++++++++++++------
 .../org/apache/myriad/scheduler/TaskUtils.java  |  3 -
 .../handlers/ResourceOffersEventHandler.java    |  6 +-
 .../handlers/StatusUpdateEventHandler.java      | 58 +++++++++---
 .../scheduler/fgs/YarnNodeCapacityManager.java  | 38 ++++++--
 8 files changed, 231 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java
 
b/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java
index 6257ffc..99078c0 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -101,7 +102,7 @@ public class MyriadFileSystemRMStateStore extends 
FileSystemRMStateStore impleme
   @Override
   public synchronized StoreContext loadMyriadState() throws Exception {
     StoreContext sc = null;
-    if (myriadStateBytes != null && myriadStateBytes.length > 0) {
+    if (ArrayUtils.isNotEmpty(myriadStateBytes)) {
       sc = StoreContext.fromSerializedBytes(myriadStateBytes);
       myriadStateBytes = null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java
index 014516d..31656fb 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java
@@ -26,7 +26,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Driver for Myriad scheduler.
+ * The MyriadDriver class is a wrapper for the Mesos SchedulerDriver class. 
Accordingly, 
+ * all public MyriadDriver methods delegate to the corresponding 
SchedulerDriver methods. 
  */
 public class MyriadDriver {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MyriadDriver.class);
@@ -38,6 +39,19 @@ public class MyriadDriver {
     this.driver = driver;
   }
 
+  /**
+   * Stops the underlying Mesos SchedulerDriver. If the failover flag is set to
+   * false, Myriad will not reconnect to Mesos. Consequently, Mesos will 
unregister 
+   * the Myriad framework and shutdown all the Myriad tasks and executors. If 
failover 
+   * is set to true, all Myriad executors and tasks will remain running for a 
defined
+   * period of time, allowing the MyriadScheduler to reconnect to Mesos.
+   *
+   * @param failover    Whether framework failover is expected.
+   *
+   * @return            The state of the driver after the call.
+   *
+   * @see Status
+   */
   public Status stop(boolean failover) {
     LOGGER.info("Stopping driver");
     Status status = driver.stop(failover);
@@ -45,6 +59,14 @@ public class MyriadDriver {
     return status;
   }
 
+  /**
+   * Starts the underlying Mesos SchedulerDriver. Note: this method must
+   * be called before any other MyriadDriver methods are invoked.
+   *
+   * @return The state of the driver after the call.
+   *
+   * @see Status
+   */
   public Status start() {
     LOGGER.info("Starting driver");
     Status status = driver.start();
@@ -52,16 +74,41 @@ public class MyriadDriver {
     return status;
   }
 
+  /**
+   * Kills the specified task via the underlying Mesos SchedulerDriver. 
+   * Important note from the Mesos documentation: "attempting to kill a 
+   * task is currently not reliable. If, for example, a scheduler fails over
+   * while it was attempting to kill a task it will need to retry in
+   * the future Likewise, if unregistered / disconnected, the request
+   * will be dropped (these semantics may be changed in the future)."
+   *
+   * @param taskId  The ID of the task to be killed.
+   *
+   * @return        The state of the driver after the call.
+   * 
+   * @see Status
+   */  
   public Status kill(final TaskID taskId) {
     Status status = driver.killTask(taskId);
     LOGGER.info("Task {} killed with status: {}", taskId, status);
     return status;
   }
 
+  /**
+   * Aborts the underlying Mesos SchedulerDriver so that no more callbacks 
+   * can be made to the MyriadScheduler. Note from Mesos documentation: 
+   * The semantics of abort and stop have deliberately been separated so that 
+   * code can detect an aborted driver and instantiate and start another 
driver 
+   * if desired (from within the same process).
+   *
+   * @return The state of the driver after the call.
+   * 
+   * @see Status
+   */  
   public Status abort() {
     LOGGER.info("Aborting driver");
     Status status = driver.abort();
-    LOGGER.info("Driver aborted with status: {}", status);
+    LOGGER.info("Aborted driver with status: {}", status);
     return status;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java
index cb850ab..561d36e 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java
@@ -18,9 +18,10 @@
  */
 package org.apache.myriad.scheduler;
 
-import com.lmax.disruptor.EventTranslator;
 import java.util.List;
+
 import javax.inject.Inject;
+
 import org.apache.mesos.Protos;
 import org.apache.mesos.Scheduler;
 import org.apache.mesos.SchedulerDriver;
@@ -36,8 +37,11 @@ import org.apache.myriad.scheduler.event.ResourceOffersEvent;
 import org.apache.myriad.scheduler.event.SlaveLostEvent;
 import org.apache.myriad.scheduler.event.StatusUpdateEvent;
 
+import com.lmax.disruptor.EventTranslator;
+
 /**
- * Myriad Scheduler
+ * The Myriad implementation of the Mesos Scheduler callback interface, where 
the method implementations
+ * publish Myriad framework events corresponding to the Mesos callbacks.
  */
 public class MyriadScheduler implements Scheduler {
   private org.apache.myriad.DisruptorManager disruptorManager;
@@ -47,6 +51,9 @@ public class MyriadScheduler implements Scheduler {
     this.disruptorManager = disruptorManager;
   }
 
+  /**
+   * Publishes a RegisteredEvent
+   */
   @Override
   public void registered(final SchedulerDriver driver, final 
Protos.FrameworkID frameworkId, final Protos.MasterInfo masterInfo) {
     disruptorManager.getRegisteredEventDisruptor().publishEvent(new 
EventTranslator<RegisteredEvent>() {
@@ -59,6 +66,9 @@ public class MyriadScheduler implements Scheduler {
     });
   }
 
+  /**
+   * Publishes a ReRegisteredEvent
+   */
   @Override
   public void reregistered(final SchedulerDriver driver, final 
Protos.MasterInfo masterInfo) {
     disruptorManager.getReRegisteredEventDisruptor().publishEvent(new 
EventTranslator<ReRegisteredEvent>() {
@@ -70,6 +80,9 @@ public class MyriadScheduler implements Scheduler {
     });
   }
 
+  /**
+   * Publishes a ResourceOffersEvent
+   */
   @Override
   public void resourceOffers(final SchedulerDriver driver, final 
List<Protos.Offer> offers) {
     disruptorManager.getResourceOffersEventDisruptor().publishEvent(new 
EventTranslator<ResourceOffersEvent>() {
@@ -81,6 +94,9 @@ public class MyriadScheduler implements Scheduler {
     });
   }
 
+  /**
+   * Publishes a OfferRescindedEvent
+   */
   @Override
   public void offerRescinded(final SchedulerDriver driver, final 
Protos.OfferID offerId) {
     disruptorManager.getOfferRescindedEventDisruptor().publishEvent(new 
EventTranslator<OfferRescindedEvent>() {
@@ -92,6 +108,9 @@ public class MyriadScheduler implements Scheduler {
     });
   }
 
+  /**
+   * Publishes a StatusUpdateEvent
+   */
   @Override
   public void statusUpdate(final SchedulerDriver driver, final 
Protos.TaskStatus status) {
     disruptorManager.getStatusUpdateEventDisruptor().publishEvent(new 
EventTranslator<StatusUpdateEvent>() {
@@ -103,6 +122,9 @@ public class MyriadScheduler implements Scheduler {
     });
   }
 
+  /**
+   * Publishes FrameworkMessageEvent
+   */
   @Override
   public void frameworkMessage(final SchedulerDriver driver, final 
Protos.ExecutorID executorId, final Protos.SlaveID slaveId,
                                final byte[] bytes) {
@@ -117,6 +139,9 @@ public class MyriadScheduler implements Scheduler {
     });
   }
 
+  /**
+   * Publishes DisconnectedEvent
+   */
   @Override
   public void disconnected(final SchedulerDriver driver) {
     disruptorManager.getDisconnectedEventDisruptor().publishEvent(new 
EventTranslator<DisconnectedEvent>() {
@@ -127,6 +152,9 @@ public class MyriadScheduler implements Scheduler {
     });
   }
 
+  /**
+   * Publishes SlaveLostEvent
+   */
   @Override
   public void slaveLost(final SchedulerDriver driver, final Protos.SlaveID 
slaveId) {
     disruptorManager.getSlaveLostEventDisruptor().publishEvent(new 
EventTranslator<SlaveLostEvent>() {
@@ -138,6 +166,9 @@ public class MyriadScheduler implements Scheduler {
     });
   }
 
+  /**
+   * Publishes ExecutorLostEvent
+   */
   @Override
   public void executorLost(final SchedulerDriver driver, final 
Protos.ExecutorID executorId, final Protos.SlaveID slaveId,
                            final int exitStatus) {
@@ -152,6 +183,9 @@ public class MyriadScheduler implements Scheduler {
     });
   }
 
+  /**
+   * Publishes ErrorEvent
+   */
   @Override
   public void error(final SchedulerDriver driver, final String message) {
     disruptorManager.getErrorEventDisruptor().publishEvent(new 
EventTranslator<ErrorEvent>() {
@@ -162,4 +196,4 @@ public class MyriadScheduler implements Scheduler {
       }
     });
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java
index 6be653b..4110b37 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java
@@ -18,10 +18,10 @@
  */
 package org.apache.myriad.scheduler;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
 import java.util.Set;
+
 import javax.inject.Inject;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.mesos.Protos.Status;
 import org.apache.mesos.Protos.TaskID;
@@ -31,8 +31,12 @@ import org.apache.myriad.state.SchedulerState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
+
 /**
- * {@link TaskTerminator} is responsible for killing tasks.
+ * {@link TaskTerminator} is basically a reaper process responsible for killing
+ * tasks marked as Killable by {@link MyriadOperations} that are stored
+ * within a {@link SchedulerState} object
  */
 public class TaskTerminator implements Runnable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskTerminator.class);
@@ -49,36 +53,68 @@ public class TaskTerminator implements Runnable {
     this.offerLifeCycleManager = offerLifecycleManager;
   }
 
+  /**
+   * Encapsulates logic that retrieves the collection of killable tasks from 
the
+   * SchedulerState object. If a task is in pending state, the task is simply 
+   * removed from SchedulerState. Any tasks in a running state were not 
successfully
+   * killed by Mesos or the callback failed, so the another kill attempt is 
made.
+   */
   @Override
-  public void run() {
-    // clone a copy of the killable tasks
-    Set<TaskID> killableTasks = 
Sets.newHashSet(schedulerState.getKillableTasks());
-
-    if (CollectionUtils.isEmpty(killableTasks)) {
-      return;
-    }
+  public void run() { 
+    //If there are 1..n killable tasks, proceed; otherwise, simply return
+    if (CollectionUtils.isNotEmpty(schedulerState.getKillableTasks())) {
+      /*
+       * Clone the killable task collection, iterate through all tasks, and 
+       * process any pending and/or non-pending tasks
+       */
+      Set<TaskID> killableTasks = 
Sets.newHashSet(schedulerState.getKillableTasks());
+      Status driverStatus = driverManager.getDriverStatus();
 
-    Status driverStatus = driverManager.getDriverStatus();
-    if (Status.DRIVER_RUNNING != driverStatus) {
-      LOGGER.warn("Cannot kill tasks, as driver is not running. Status: {}", 
driverStatus);
-      return;
-    }
+      //TODO (hokiegeek2) Can the DriverManager be restarted? If not, should 
the ResourceManager stop?
+      if (Status.DRIVER_RUNNING != driverStatus) {
+        LOGGER.warn("Cannot kill tasks because Mesos Driver is not running. 
Status: {}", driverStatus);
+        return;
+      }
 
-    for (TaskID taskIdToKill : killableTasks) {
-      if (this.schedulerState.getPendingTaskIds().contains(taskIdToKill)) {
-        this.schedulerState.removeTask(taskIdToKill);
-      } else {
-        Status status = this.driverManager.kill(taskIdToKill);
-        NodeTask task = schedulerState.getTask(taskIdToKill);
-        if (task != null) {
-          offerLifeCycleManager.declineOutstandingOffers(task.getHostname());
-          this.schedulerState.removeTask(taskIdToKill);
+      for (TaskID taskIdToKill : killableTasks) {
+        LOGGER.info("Received task kill request for task: {}", taskIdToKill);
+        if (isPendingTask(taskIdToKill)) {
+          handlePendingTask(taskIdToKill);
         } else {
-          schedulerState.removeTask(taskIdToKill);
-          LOGGER.warn("NodeTask with taskId: {} does not exist", taskIdToKill);
+          handleNonPendingTask(taskIdToKill);
         }
-        Preconditions.checkState(status == Status.DRIVER_RUNNING);
       }
     }
   }
-}
+  
+  private void handlePendingTask(TaskID taskId) {
+    /*
+     * since task is pending and has not started, simply remove 
+     * it from SchedulerState task collection
+     */
+    schedulerState.removeTask(taskId);
+  }
+  
+  private void handleNonPendingTask(TaskID taskId) { 
+    /*
+     * Kill the task and decline additional offers for it, but hold off 
removing from SchedulerState. 
+     * Removal of the killable task must be done following invocation of 
statusUpdate callback method
+     * which constitutes acknowledgement from Mesos that the kill task request 
succeeded.
+     */
+    Status status = this.driverManager.kill(taskId);
+    NodeTask task = schedulerState.getTask(taskId);
+
+    if (task != null) {
+      offerLifeCycleManager.declineOutstandingOffers(task.getHostname());
+    } 
+    if (status.equals(Status.DRIVER_RUNNING)) {
+      LOGGER.info("Kill request for {} was submitted to a running 
SchedulerDriver", taskId);
+    } else {
+      LOGGER.warn("Kill task request for {} submitted to non-running 
SchedulerDriver, may fail", taskId);
+    }
+  }
+
+  private boolean isPendingTask(TaskID taskId) {
+    return this.schedulerState.getPendingTaskIds().contains(taskId);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
index d73a467..c8e2a21 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
@@ -75,9 +75,6 @@ public class TaskUtils {
   private static final String CONTAINER_PATH_KEY = "containerPath";
   private static final String HOST_PATH_KEY = "hostPath";
   private static final String RW_MODE = "mode";
-  private static final String CONTAINER_PORT_KEY = "containerPort";
-  private static final String HOST_PORT_KEY = "hostPort";
-  private static final String PROTOCOL_KEY = "protocol";
   private static final String PARAMETER_KEY_KEY = "key";
   private static final String PARAMETER_VALUE_KEY = "value";
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
index 8d1cd03..f0e80e9 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
@@ -97,7 +97,7 @@ public class ResourceOffersEventHandler implements 
EventHandler<ResourceOffersEv
       }
       return;
     }
-    LOGGER.info("Received offers {}", offers.size());
+    LOGGER.debug("Received offers {}", offers.size());
     LOGGER.debug("Pending tasks: {}", this.schedulerState.getPendingTaskIds());
     driverOperationLock.lock();
     try {
@@ -218,7 +218,7 @@ public class ResourceOffersEventHandler implements 
EventHandler<ResourceOffersEv
     if (aggrCpu <= cpus && aggrMem <= mem && taskConstraints.portsCount() <= 
ports) {
       return true;
     } else {
-      LOGGER.info("Offer not sufficient for task with, cpu: {}, memory: {}, 
ports: {}", aggrCpu, aggrMem, ports);
+      LOGGER.debug("Offer insufficient for task with, cpu: {}, memory: {}, 
ports: {}", aggrCpu, aggrMem, ports);
       return false;
     }
   }
@@ -243,7 +243,7 @@ public class ResourceOffersEventHandler implements 
EventHandler<ResourceOffersEv
 
   private void checkResource(boolean fail, String resource) {
     if (fail) {
-      LOGGER.info("No " + resource + " resources present");
+      LOGGER.debug("No " + resource + " resources present");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
index 25d0440..079df4b 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
@@ -31,7 +31,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * handles and logs mesos status update events
+ * Handles and logs mesos StatusUpdateEvents based upon the corresponding
+ * Protos.TaskState enum value
  */
 public class StatusUpdateEventHandler implements 
EventHandler<StatusUpdateEvent> {
 
@@ -45,7 +46,21 @@ public class StatusUpdateEventHandler implements 
EventHandler<StatusUpdateEvent>
     this.schedulerState = schedulerState;
     this.offerLifecycleManager = offerLifecycleManager;
   }
-
+  
+  /**
+   * Encapsulates the logic to log and respond to the incoming 
StatusUpdateEvent per the
+   * Event TaskStatus state:
+   * 
+   * 1. TASK_STAGING: mark task as staging wtihin SchedulerState
+   * 2. TASK_STARTING: mark task as staging within SchedulerState
+   * 3. TASK_RUNNING: mark task as active within SchedulerState
+   * 4. TASK_FINISHED: decline outstanding offers and remove task from 
SchedulerState
+   * 5. TASK_FAILED: decline outstanding offers, remove failed, killable tasks 
from SchedulerState,
+   *    mark as pending non-killable, failed tasks
+   * 6. TASK_KILLED: decline outstanding offers, removed killed tasks from 
SchedulerState
+   * 7. TASK_LOST: decline outstanding offers, remove killable, lost tasks 
from SchedulerState,
+   *    mark as pending non-killable, lost tasks
+   */
   @Override
   public void onEvent(StatusUpdateEvent event, long sequence, boolean 
endOfBatch) throws Exception {
     TaskStatus status = event.getStatus();
@@ -71,25 +86,44 @@ public class StatusUpdateEventHandler implements 
EventHandler<StatusUpdateEvent>
         schedulerState.makeTaskActive(taskId);
         break;
       case TASK_FINISHED:
-        offerLifecycleManager.declineOutstandingOffers(task.getHostname());
-        schedulerState.removeTask(taskId);
+        cleanupTask(taskId, task, "finished");
         break;
       case TASK_FAILED:
-        // Add to pending tasks
-        offerLifecycleManager.declineOutstandingOffers(task.getHostname());
-        schedulerState.makeTaskPending(taskId);
+        cleanupFailedTask(taskId, task, "failed");
         break;
       case TASK_KILLED:
-        offerLifecycleManager.declineOutstandingOffers(task.getHostname());
-        schedulerState.removeTask(taskId);
+        cleanupTask(taskId, task, "killed");
         break;
       case TASK_LOST:
-        offerLifecycleManager.declineOutstandingOffers(task.getHostname());
-        schedulerState.makeTaskPending(taskId);
+        cleanupFailedTask(taskId, task, "lost");
         break;
       default:
         LOGGER.error("Invalid state: {}", state);
         break;
     }
   }
-}
+
+  private void cleanupFailedTask(TaskID taskId, NodeTask task, String 
stopReason) {
+    offerLifecycleManager.declineOutstandingOffers(task.getHostname());
+    /*
+     * Remove the task from SchedulerState if the task is killable.  Otherwise,
+     * mark the task as pending to enable restart.
+     */
+    if (taskIsKillable(taskId)) {
+      schedulerState.removeTask(taskId);
+      LOGGER.info("Removed killable, {} task with id {}", stopReason, taskId);
+    } else {
+      schedulerState.makeTaskPending(taskId);        
+      LOGGER.info("Marked as pending {} task with id {}", stopReason, taskId);
+    }  
+  }
+  
+  private void cleanupTask(TaskID taskId, NodeTask task, String stopReason) {
+    offerLifecycleManager.declineOutstandingOffers(task.getHostname());
+    schedulerState.removeTask(taskId);    
+    LOGGER.info("Removed {} task with id {}", stopReason, taskId);
+  }
+  private boolean taskIsKillable(TaskID taskId) {
+    return schedulerState.getKillableTasks().contains(taskId);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
index e922fc6..8f7c6f5 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
@@ -127,13 +127,15 @@ public class YarnNodeCapacityManager extends 
BaseInterceptor {
   }
 
   private void removeYarnTask(RMContainer rmContainer) {
-    if (rmContainer != null && rmContainer.getContainer() != null) {
+    if (containersNotNull(rmContainer)){
       Protos.TaskID taskId = containerToTaskId(rmContainer);
-      //TODO (darinj) Reliable messaging
+      /*
+       * Mark the task as killable within the ServerState object to flag the 
task 
+       * for the TaskTerminator daemon to kill the task
+       */
       state.makeTaskKillable(taskId);
-      myriadDriver.kill(taskId);
-      String hostname = rmContainer.getContainer().getNodeId().getHost();
-      Node node = nodeStore.getNode(hostname);
+      
+      Node node = retrieveNode(rmContainer);
       if (node != null) {
         RMNode rmNode = node.getNode().getRMNode();
         Resource resource = rmContainer.getContainer().getResource();
@@ -141,11 +143,20 @@ public class YarnNodeCapacityManager extends 
BaseInterceptor {
         LOGGER.info("Removed task yarn_{} with exit status freeing {} cpu and 
{} mem.", rmContainer.getContainer().toString(),
             rmContainer.getContainerExitStatus(), resource.getVirtualCores(), 
resource.getMemory());
       } else {
-        LOGGER.warn(hostname + " not found");
+        LOGGER.warn("The Node for the {} host was not found", 
rmContainer.getContainer().getNodeId().getHost());
       }
     }
   }
 
+  private Node retrieveNode(RMContainer container) {
+    String hostname = container.getContainer().getNodeId().getHost();
+    return nodeStore.getNode(hostname);
+  }
+
+  private boolean containersNotNull(RMContainer rmContainer) {
+    return (rmContainer != null && rmContainer.getContainer() != null);
+  }
+  
   @Override
   public void afterSchedulerEventHandled(SchedulerEvent event) {
     switch (event.getType()) {
@@ -182,7 +193,7 @@ public class YarnNodeCapacityManager extends 
BaseInterceptor {
 
   /**
    * Checks if any containers were allocated in the current scheduler run and
-   * launches the corresponding Mesos tasks. It also udpates the node
+   * launches the corresponding Mesos tasks. It also updates the node
    * capacity depending on what portion of the consumed offers were actually
    * used.
    */
@@ -232,11 +243,22 @@ public class YarnNodeCapacityManager extends 
BaseInterceptor {
     node.removeContainerSnapshot();
   }
 
-
+  /**
+   * Increments the capacity for the specified RMNode
+   * 
+   * @param rmNode
+   * @param removedCapacity
+   */
   public void incrementNodeCapacity(RMNode rmNode, Resource addedCapacity) {
     setNodeCapacity(rmNode, Resources.add(rmNode.getTotalCapability(), 
addedCapacity));
   }
 
+  /**
+   * Decrements the capacity for the specified RMNode
+   * 
+   * @param rmNode
+   * @param removedCapacity
+   */
   public void decrementNodeCapacity(RMNode rmNode, Resource removedCapacity) {
     setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), 
removedCapacity));
   }

Reply via email to