Repository: incubator-myriad Updated Branches: refs/heads/master 4a6e50c41 -> 7207e2b04
MYRIAD-220 Initial check-in Encapsulates changes to implement MYRIAD-220 along with enhanced/added comments JIRA: [MYRIAD-220] https://issues.apache.org/jira/browse/MYRIAD-220 Pull Request: Closes #84 Author: hokiegeek2 <hokiege...@gmail.com> Date: Thu Jun 30 16:03:44 2016 -0400 Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/7207e2b0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/7207e2b0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/7207e2b0 Branch: refs/heads/master Commit: 7207e2b04d8c9a0d74376cdeca8216fd237c960c Parents: 4a6e50c Author: hokiegeek2 <hokiege...@gmail.com> Authored: Thu Jun 30 16:03:44 2016 -0400 Committer: darinj <dar...@apache.org> Committed: Thu Jul 14 15:03:10 2016 -0400 ---------------------------------------------------------------------- .../recovery/MyriadFileSystemRMStateStore.java | 3 +- .../apache/myriad/scheduler/MyriadDriver.java | 51 ++++++++++- .../myriad/scheduler/MyriadScheduler.java | 40 ++++++++- .../apache/myriad/scheduler/TaskTerminator.java | 92 ++++++++++++++------ .../org/apache/myriad/scheduler/TaskUtils.java | 3 - .../handlers/ResourceOffersEventHandler.java | 6 +- .../handlers/StatusUpdateEventHandler.java | 58 +++++++++--- .../scheduler/fgs/YarnNodeCapacityManager.java | 38 ++++++-- 8 files changed, 231 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java b/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java index 6257ffc..99078c0 100644 --- a/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java +++ b/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -101,7 +102,7 @@ public class MyriadFileSystemRMStateStore extends FileSystemRMStateStore impleme @Override public synchronized StoreContext loadMyriadState() throws Exception { StoreContext sc = null; - if (myriadStateBytes != null && myriadStateBytes.length > 0) { + if (ArrayUtils.isNotEmpty(myriadStateBytes)) { sc = StoreContext.fromSerializedBytes(myriadStateBytes); myriadStateBytes = null; } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java index 014516d..31656fb 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java @@ -26,7 +26,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Driver for Myriad scheduler. + * The MyriadDriver class is a wrapper for the Mesos SchedulerDriver class. Accordingly, + * all public MyriadDriver methods delegate to the corresponding SchedulerDriver methods. */ public class MyriadDriver { private static final Logger LOGGER = LoggerFactory.getLogger(MyriadDriver.class); @@ -38,6 +39,19 @@ public class MyriadDriver { this.driver = driver; } + /** + * Stops the underlying Mesos SchedulerDriver. If the failover flag is set to + * false, Myriad will not reconnect to Mesos. Consequently, Mesos will unregister + * the Myriad framework and shutdown all the Myriad tasks and executors. If failover + * is set to true, all Myriad executors and tasks will remain running for a defined + * period of time, allowing the MyriadScheduler to reconnect to Mesos. + * + * @param failover Whether framework failover is expected. + * + * @return The state of the driver after the call. + * + * @see Status + */ public Status stop(boolean failover) { LOGGER.info("Stopping driver"); Status status = driver.stop(failover); @@ -45,6 +59,14 @@ public class MyriadDriver { return status; } + /** + * Starts the underlying Mesos SchedulerDriver. Note: this method must + * be called before any other MyriadDriver methods are invoked. + * + * @return The state of the driver after the call. + * + * @see Status + */ public Status start() { LOGGER.info("Starting driver"); Status status = driver.start(); @@ -52,16 +74,41 @@ public class MyriadDriver { return status; } + /** + * Kills the specified task via the underlying Mesos SchedulerDriver. + * Important note from the Mesos documentation: "attempting to kill a + * task is currently not reliable. If, for example, a scheduler fails over + * while it was attempting to kill a task it will need to retry in + * the future Likewise, if unregistered / disconnected, the request + * will be dropped (these semantics may be changed in the future)." + * + * @param taskId The ID of the task to be killed. + * + * @return The state of the driver after the call. + * + * @see Status + */ public Status kill(final TaskID taskId) { Status status = driver.killTask(taskId); LOGGER.info("Task {} killed with status: {}", taskId, status); return status; } + /** + * Aborts the underlying Mesos SchedulerDriver so that no more callbacks + * can be made to the MyriadScheduler. Note from Mesos documentation: + * The semantics of abort and stop have deliberately been separated so that + * code can detect an aborted driver and instantiate and start another driver + * if desired (from within the same process). + * + * @return The state of the driver after the call. + * + * @see Status + */ public Status abort() { LOGGER.info("Aborting driver"); Status status = driver.abort(); - LOGGER.info("Driver aborted with status: {}", status); + LOGGER.info("Aborted driver with status: {}", status); return status; } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java index cb850ab..561d36e 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java @@ -18,9 +18,10 @@ */ package org.apache.myriad.scheduler; -import com.lmax.disruptor.EventTranslator; import java.util.List; + import javax.inject.Inject; + import org.apache.mesos.Protos; import org.apache.mesos.Scheduler; import org.apache.mesos.SchedulerDriver; @@ -36,8 +37,11 @@ import org.apache.myriad.scheduler.event.ResourceOffersEvent; import org.apache.myriad.scheduler.event.SlaveLostEvent; import org.apache.myriad.scheduler.event.StatusUpdateEvent; +import com.lmax.disruptor.EventTranslator; + /** - * Myriad Scheduler + * The Myriad implementation of the Mesos Scheduler callback interface, where the method implementations + * publish Myriad framework events corresponding to the Mesos callbacks. */ public class MyriadScheduler implements Scheduler { private org.apache.myriad.DisruptorManager disruptorManager; @@ -47,6 +51,9 @@ public class MyriadScheduler implements Scheduler { this.disruptorManager = disruptorManager; } + /** + * Publishes a RegisteredEvent + */ @Override public void registered(final SchedulerDriver driver, final Protos.FrameworkID frameworkId, final Protos.MasterInfo masterInfo) { disruptorManager.getRegisteredEventDisruptor().publishEvent(new EventTranslator<RegisteredEvent>() { @@ -59,6 +66,9 @@ public class MyriadScheduler implements Scheduler { }); } + /** + * Publishes a ReRegisteredEvent + */ @Override public void reregistered(final SchedulerDriver driver, final Protos.MasterInfo masterInfo) { disruptorManager.getReRegisteredEventDisruptor().publishEvent(new EventTranslator<ReRegisteredEvent>() { @@ -70,6 +80,9 @@ public class MyriadScheduler implements Scheduler { }); } + /** + * Publishes a ResourceOffersEvent + */ @Override public void resourceOffers(final SchedulerDriver driver, final List<Protos.Offer> offers) { disruptorManager.getResourceOffersEventDisruptor().publishEvent(new EventTranslator<ResourceOffersEvent>() { @@ -81,6 +94,9 @@ public class MyriadScheduler implements Scheduler { }); } + /** + * Publishes a OfferRescindedEvent + */ @Override public void offerRescinded(final SchedulerDriver driver, final Protos.OfferID offerId) { disruptorManager.getOfferRescindedEventDisruptor().publishEvent(new EventTranslator<OfferRescindedEvent>() { @@ -92,6 +108,9 @@ public class MyriadScheduler implements Scheduler { }); } + /** + * Publishes a StatusUpdateEvent + */ @Override public void statusUpdate(final SchedulerDriver driver, final Protos.TaskStatus status) { disruptorManager.getStatusUpdateEventDisruptor().publishEvent(new EventTranslator<StatusUpdateEvent>() { @@ -103,6 +122,9 @@ public class MyriadScheduler implements Scheduler { }); } + /** + * Publishes FrameworkMessageEvent + */ @Override public void frameworkMessage(final SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final byte[] bytes) { @@ -117,6 +139,9 @@ public class MyriadScheduler implements Scheduler { }); } + /** + * Publishes DisconnectedEvent + */ @Override public void disconnected(final SchedulerDriver driver) { disruptorManager.getDisconnectedEventDisruptor().publishEvent(new EventTranslator<DisconnectedEvent>() { @@ -127,6 +152,9 @@ public class MyriadScheduler implements Scheduler { }); } + /** + * Publishes SlaveLostEvent + */ @Override public void slaveLost(final SchedulerDriver driver, final Protos.SlaveID slaveId) { disruptorManager.getSlaveLostEventDisruptor().publishEvent(new EventTranslator<SlaveLostEvent>() { @@ -138,6 +166,9 @@ public class MyriadScheduler implements Scheduler { }); } + /** + * Publishes ExecutorLostEvent + */ @Override public void executorLost(final SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final int exitStatus) { @@ -152,6 +183,9 @@ public class MyriadScheduler implements Scheduler { }); } + /** + * Publishes ErrorEvent + */ @Override public void error(final SchedulerDriver driver, final String message) { disruptorManager.getErrorEventDisruptor().publishEvent(new EventTranslator<ErrorEvent>() { @@ -162,4 +196,4 @@ public class MyriadScheduler implements Scheduler { } }); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java index 6be653b..4110b37 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java @@ -18,10 +18,10 @@ */ package org.apache.myriad.scheduler; -import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; import java.util.Set; + import javax.inject.Inject; + import org.apache.commons.collections.CollectionUtils; import org.apache.mesos.Protos.Status; import org.apache.mesos.Protos.TaskID; @@ -31,8 +31,12 @@ import org.apache.myriad.state.SchedulerState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Sets; + /** - * {@link TaskTerminator} is responsible for killing tasks. + * {@link TaskTerminator} is basically a reaper process responsible for killing + * tasks marked as Killable by {@link MyriadOperations} that are stored + * within a {@link SchedulerState} object */ public class TaskTerminator implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(TaskTerminator.class); @@ -49,36 +53,68 @@ public class TaskTerminator implements Runnable { this.offerLifeCycleManager = offerLifecycleManager; } + /** + * Encapsulates logic that retrieves the collection of killable tasks from the + * SchedulerState object. If a task is in pending state, the task is simply + * removed from SchedulerState. Any tasks in a running state were not successfully + * killed by Mesos or the callback failed, so the another kill attempt is made. + */ @Override - public void run() { - // clone a copy of the killable tasks - Set<TaskID> killableTasks = Sets.newHashSet(schedulerState.getKillableTasks()); - - if (CollectionUtils.isEmpty(killableTasks)) { - return; - } + public void run() { + //If there are 1..n killable tasks, proceed; otherwise, simply return + if (CollectionUtils.isNotEmpty(schedulerState.getKillableTasks())) { + /* + * Clone the killable task collection, iterate through all tasks, and + * process any pending and/or non-pending tasks + */ + Set<TaskID> killableTasks = Sets.newHashSet(schedulerState.getKillableTasks()); + Status driverStatus = driverManager.getDriverStatus(); - Status driverStatus = driverManager.getDriverStatus(); - if (Status.DRIVER_RUNNING != driverStatus) { - LOGGER.warn("Cannot kill tasks, as driver is not running. Status: {}", driverStatus); - return; - } + //TODO (hokiegeek2) Can the DriverManager be restarted? If not, should the ResourceManager stop? + if (Status.DRIVER_RUNNING != driverStatus) { + LOGGER.warn("Cannot kill tasks because Mesos Driver is not running. Status: {}", driverStatus); + return; + } - for (TaskID taskIdToKill : killableTasks) { - if (this.schedulerState.getPendingTaskIds().contains(taskIdToKill)) { - this.schedulerState.removeTask(taskIdToKill); - } else { - Status status = this.driverManager.kill(taskIdToKill); - NodeTask task = schedulerState.getTask(taskIdToKill); - if (task != null) { - offerLifeCycleManager.declineOutstandingOffers(task.getHostname()); - this.schedulerState.removeTask(taskIdToKill); + for (TaskID taskIdToKill : killableTasks) { + LOGGER.info("Received task kill request for task: {}", taskIdToKill); + if (isPendingTask(taskIdToKill)) { + handlePendingTask(taskIdToKill); } else { - schedulerState.removeTask(taskIdToKill); - LOGGER.warn("NodeTask with taskId: {} does not exist", taskIdToKill); + handleNonPendingTask(taskIdToKill); } - Preconditions.checkState(status == Status.DRIVER_RUNNING); } } } -} + + private void handlePendingTask(TaskID taskId) { + /* + * since task is pending and has not started, simply remove + * it from SchedulerState task collection + */ + schedulerState.removeTask(taskId); + } + + private void handleNonPendingTask(TaskID taskId) { + /* + * Kill the task and decline additional offers for it, but hold off removing from SchedulerState. + * Removal of the killable task must be done following invocation of statusUpdate callback method + * which constitutes acknowledgement from Mesos that the kill task request succeeded. + */ + Status status = this.driverManager.kill(taskId); + NodeTask task = schedulerState.getTask(taskId); + + if (task != null) { + offerLifeCycleManager.declineOutstandingOffers(task.getHostname()); + } + if (status.equals(Status.DRIVER_RUNNING)) { + LOGGER.info("Kill request for {} was submitted to a running SchedulerDriver", taskId); + } else { + LOGGER.warn("Kill task request for {} submitted to non-running SchedulerDriver, may fail", taskId); + } + } + + private boolean isPendingTask(TaskID taskId) { + return this.schedulerState.getPendingTaskIds().contains(taskId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java index d73a467..c8e2a21 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java @@ -75,9 +75,6 @@ public class TaskUtils { private static final String CONTAINER_PATH_KEY = "containerPath"; private static final String HOST_PATH_KEY = "hostPath"; private static final String RW_MODE = "mode"; - private static final String CONTAINER_PORT_KEY = "containerPort"; - private static final String HOST_PORT_KEY = "hostPort"; - private static final String PROTOCOL_KEY = "protocol"; private static final String PARAMETER_KEY_KEY = "key"; private static final String PARAMETER_VALUE_KEY = "value"; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java index 8d1cd03..f0e80e9 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java @@ -97,7 +97,7 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv } return; } - LOGGER.info("Received offers {}", offers.size()); + LOGGER.debug("Received offers {}", offers.size()); LOGGER.debug("Pending tasks: {}", this.schedulerState.getPendingTaskIds()); driverOperationLock.lock(); try { @@ -218,7 +218,7 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv if (aggrCpu <= cpus && aggrMem <= mem && taskConstraints.portsCount() <= ports) { return true; } else { - LOGGER.info("Offer not sufficient for task with, cpu: {}, memory: {}, ports: {}", aggrCpu, aggrMem, ports); + LOGGER.debug("Offer insufficient for task with, cpu: {}, memory: {}, ports: {}", aggrCpu, aggrMem, ports); return false; } } @@ -243,7 +243,7 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv private void checkResource(boolean fail, String resource) { if (fail) { - LOGGER.info("No " + resource + " resources present"); + LOGGER.debug("No " + resource + " resources present"); } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java index 25d0440..079df4b 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java @@ -31,7 +31,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * handles and logs mesos status update events + * Handles and logs mesos StatusUpdateEvents based upon the corresponding + * Protos.TaskState enum value */ public class StatusUpdateEventHandler implements EventHandler<StatusUpdateEvent> { @@ -45,7 +46,21 @@ public class StatusUpdateEventHandler implements EventHandler<StatusUpdateEvent> this.schedulerState = schedulerState; this.offerLifecycleManager = offerLifecycleManager; } - + + /** + * Encapsulates the logic to log and respond to the incoming StatusUpdateEvent per the + * Event TaskStatus state: + * + * 1. TASK_STAGING: mark task as staging wtihin SchedulerState + * 2. TASK_STARTING: mark task as staging within SchedulerState + * 3. TASK_RUNNING: mark task as active within SchedulerState + * 4. TASK_FINISHED: decline outstanding offers and remove task from SchedulerState + * 5. TASK_FAILED: decline outstanding offers, remove failed, killable tasks from SchedulerState, + * mark as pending non-killable, failed tasks + * 6. TASK_KILLED: decline outstanding offers, removed killed tasks from SchedulerState + * 7. TASK_LOST: decline outstanding offers, remove killable, lost tasks from SchedulerState, + * mark as pending non-killable, lost tasks + */ @Override public void onEvent(StatusUpdateEvent event, long sequence, boolean endOfBatch) throws Exception { TaskStatus status = event.getStatus(); @@ -71,25 +86,44 @@ public class StatusUpdateEventHandler implements EventHandler<StatusUpdateEvent> schedulerState.makeTaskActive(taskId); break; case TASK_FINISHED: - offerLifecycleManager.declineOutstandingOffers(task.getHostname()); - schedulerState.removeTask(taskId); + cleanupTask(taskId, task, "finished"); break; case TASK_FAILED: - // Add to pending tasks - offerLifecycleManager.declineOutstandingOffers(task.getHostname()); - schedulerState.makeTaskPending(taskId); + cleanupFailedTask(taskId, task, "failed"); break; case TASK_KILLED: - offerLifecycleManager.declineOutstandingOffers(task.getHostname()); - schedulerState.removeTask(taskId); + cleanupTask(taskId, task, "killed"); break; case TASK_LOST: - offerLifecycleManager.declineOutstandingOffers(task.getHostname()); - schedulerState.makeTaskPending(taskId); + cleanupFailedTask(taskId, task, "lost"); break; default: LOGGER.error("Invalid state: {}", state); break; } } -} + + private void cleanupFailedTask(TaskID taskId, NodeTask task, String stopReason) { + offerLifecycleManager.declineOutstandingOffers(task.getHostname()); + /* + * Remove the task from SchedulerState if the task is killable. Otherwise, + * mark the task as pending to enable restart. + */ + if (taskIsKillable(taskId)) { + schedulerState.removeTask(taskId); + LOGGER.info("Removed killable, {} task with id {}", stopReason, taskId); + } else { + schedulerState.makeTaskPending(taskId); + LOGGER.info("Marked as pending {} task with id {}", stopReason, taskId); + } + } + + private void cleanupTask(TaskID taskId, NodeTask task, String stopReason) { + offerLifecycleManager.declineOutstandingOffers(task.getHostname()); + schedulerState.removeTask(taskId); + LOGGER.info("Removed {} task with id {}", stopReason, taskId); + } + private boolean taskIsKillable(TaskID taskId) { + return schedulerState.getKillableTasks().contains(taskId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java index e922fc6..8f7c6f5 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java @@ -127,13 +127,15 @@ public class YarnNodeCapacityManager extends BaseInterceptor { } private void removeYarnTask(RMContainer rmContainer) { - if (rmContainer != null && rmContainer.getContainer() != null) { + if (containersNotNull(rmContainer)){ Protos.TaskID taskId = containerToTaskId(rmContainer); - //TODO (darinj) Reliable messaging + /* + * Mark the task as killable within the ServerState object to flag the task + * for the TaskTerminator daemon to kill the task + */ state.makeTaskKillable(taskId); - myriadDriver.kill(taskId); - String hostname = rmContainer.getContainer().getNodeId().getHost(); - Node node = nodeStore.getNode(hostname); + + Node node = retrieveNode(rmContainer); if (node != null) { RMNode rmNode = node.getNode().getRMNode(); Resource resource = rmContainer.getContainer().getResource(); @@ -141,11 +143,20 @@ public class YarnNodeCapacityManager extends BaseInterceptor { LOGGER.info("Removed task yarn_{} with exit status freeing {} cpu and {} mem.", rmContainer.getContainer().toString(), rmContainer.getContainerExitStatus(), resource.getVirtualCores(), resource.getMemory()); } else { - LOGGER.warn(hostname + " not found"); + LOGGER.warn("The Node for the {} host was not found", rmContainer.getContainer().getNodeId().getHost()); } } } + private Node retrieveNode(RMContainer container) { + String hostname = container.getContainer().getNodeId().getHost(); + return nodeStore.getNode(hostname); + } + + private boolean containersNotNull(RMContainer rmContainer) { + return (rmContainer != null && rmContainer.getContainer() != null); + } + @Override public void afterSchedulerEventHandled(SchedulerEvent event) { switch (event.getType()) { @@ -182,7 +193,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor { /** * Checks if any containers were allocated in the current scheduler run and - * launches the corresponding Mesos tasks. It also udpates the node + * launches the corresponding Mesos tasks. It also updates the node * capacity depending on what portion of the consumed offers were actually * used. */ @@ -232,11 +243,22 @@ public class YarnNodeCapacityManager extends BaseInterceptor { node.removeContainerSnapshot(); } - + /** + * Increments the capacity for the specified RMNode + * + * @param rmNode + * @param removedCapacity + */ public void incrementNodeCapacity(RMNode rmNode, Resource addedCapacity) { setNodeCapacity(rmNode, Resources.add(rmNode.getTotalCapability(), addedCapacity)); } + /** + * Decrements the capacity for the specified RMNode + * + * @param rmNode + * @param removedCapacity + */ public void decrementNodeCapacity(RMNode rmNode, Resource removedCapacity) { setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), removedCapacity)); }