http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java new file mode 100644 index 0000000..b61496f --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.event.handlers; + +import com.google.common.collect.Sets; +import com.lmax.disruptor.EventHandler; + +import java.util.Iterator; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.Offer; +import org.apache.mesos.Protos.OfferID; +import org.apache.mesos.Protos.Resource; +import org.apache.mesos.Protos.TaskInfo; +import org.apache.mesos.Protos.Value; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * handles and logs resource offers events + */ +public class ResourceOffersEventHandler implements EventHandler<org.apache.myriad.scheduler.event.ResourceOffersEvent> { + private static final Logger LOGGER = LoggerFactory.getLogger(ResourceOffersEventHandler.class); + + private static final Lock driverOperationLock = new ReentrantLock(); + + private static final String RESOURCES_CPU_KEY = "cpus"; + private static final String RESOURCES_MEM_KEY = "mem"; + private static final String RESOURCES_PORTS_KEY = "ports"; + private static final String RESOURCES_DISK_KEY = "disk"; + + + @Inject + private org.apache.myriad.state.SchedulerState schedulerState; + + @Inject + private org.apache.myriad.scheduler.TaskUtils taskUtils; + + @Inject + private Map<String, org.apache.myriad.scheduler.TaskFactory> taskFactoryMap; + + @Inject + private org.apache.myriad.scheduler.fgs.OfferLifecycleManager offerLifecycleMgr; + + @Inject + private org.apache.myriad.scheduler.TaskConstraintsManager taskConstraintsManager; + + @Override + public void onEvent(org.apache.myriad.scheduler.event.ResourceOffersEvent event, long sequence, boolean endOfBatch) throws Exception { + SchedulerDriver driver = event.getDriver(); + List<Offer> offers = event.getOffers(); + + // Sometimes, we see that mesos sends resource offers before Myriad receives + // a notification for "framework registration". This is a simple defensive code + // to not process any offers unless Myriad receives a "framework registered" notification. + if (schedulerState.getFrameworkID() == null) { + LOGGER.warn("Received {} offers, but declining them since Framework ID is not yet set", offers.size()); + for (Offer offer : offers) { + driver.declineOffer(offer.getId()); + } + return; + } + LOGGER.info("Received offers {}", offers.size()); + LOGGER.debug("Pending tasks: {}", this.schedulerState.getPendingTaskIds()); + driverOperationLock.lock(); + try { + for (Iterator<Offer> iterator = offers.iterator(); iterator.hasNext(); ) { + Offer offer = iterator.next(); + Set<org.apache.myriad.state.NodeTask> nodeTasks = schedulerState.getNodeTasks(offer.getSlaveId()); + for (org.apache.myriad.state.NodeTask nodeTask : nodeTasks) { + nodeTask.setSlaveAttributes(offer.getAttributesList()); + } + // keep this in case SchedulerState gets out of sync. This should not happen with + // synchronizing addNodes method in SchedulerState + // but to keep it safe + final Set<Protos.TaskID> missingTasks = Sets.newHashSet(); + Set<Protos.TaskID> pendingTasks = schedulerState.getPendingTaskIds(); + if (CollectionUtils.isNotEmpty(pendingTasks)) { + for (Protos.TaskID pendingTaskId : pendingTasks) { + org.apache.myriad.state.NodeTask taskToLaunch = schedulerState.getTask(pendingTaskId); + if (taskToLaunch == null) { + missingTasks.add(pendingTaskId); + LOGGER.warn("Node task for TaskID: {} does not exist", pendingTaskId); + continue; + } + String taskPrefix = taskToLaunch.getTaskPrefix(); + org.apache.myriad.scheduler.ServiceResourceProfile profile = taskToLaunch.getProfile(); + org.apache.myriad.scheduler.constraints.Constraint constraint = taskToLaunch.getConstraint(); + + Set<org.apache.myriad.state.NodeTask> launchedTasks = new HashSet<>(); + launchedTasks.addAll(schedulerState.getActiveTasksByType(taskPrefix)); + launchedTasks.addAll(schedulerState.getStagingTasksByType(taskPrefix)); + + if (matches(offer, taskToLaunch, constraint) && org.apache.myriad.scheduler.SchedulerUtils.isUniqueHostname(offer, taskToLaunch, launchedTasks)) { + try { + final TaskInfo task = taskFactoryMap.get(taskPrefix).createTask(offer, schedulerState.getFrameworkID(), pendingTaskId, taskToLaunch); + List<OfferID> offerIds = new ArrayList<>(); + offerIds.add(offer.getId()); + List<TaskInfo> tasks = new ArrayList<>(); + tasks.add(task); + LOGGER.info("Launching task: {} using offer: {}", task.getTaskId().getValue(), offer.getId()); + LOGGER.debug("Launching task: {} with profile: {} using offer: {}", task, profile, offer); + driver.launchTasks(offerIds, tasks); + schedulerState.makeTaskStaging(pendingTaskId); + + // For every NM Task that we launch, we currently + // need to backup the ExecutorInfo for that NM Task in the State Store. + // Without this, we will not be able to launch tasks corresponding to yarn + // containers. This is specially important in case the RM restarts. + taskToLaunch.setExecutorInfo(task.getExecutor()); + taskToLaunch.setHostname(offer.getHostname()); + taskToLaunch.setSlaveId(offer.getSlaveId()); + schedulerState.addTask(pendingTaskId, taskToLaunch); + iterator.remove(); // remove the used offer from offers list + break; + } catch (Throwable t) { + LOGGER.error("Exception thrown while trying to create a task for {}", taskPrefix, t); + } + } + } + for (Protos.TaskID taskId : missingTasks) { + schedulerState.removeTask(taskId); + } + } + } + + for (Offer offer : offers) { + if (org.apache.myriad.scheduler.SchedulerUtils.isEligibleForFineGrainedScaling(offer.getHostname(), schedulerState)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Picking an offer from slave with hostname {} for fine grained scaling.", offer.getHostname()); + } + offerLifecycleMgr.addOffers(offer); + } else { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Declining offer {} from slave {}.", offer, offer.getHostname()); + } + driver.declineOffer(offer.getId()); + } + } + } finally { + driverOperationLock.unlock(); + } + } + + private boolean matches(Offer offer, org.apache.myriad.state.NodeTask taskToLaunch, org.apache.myriad.scheduler.constraints.Constraint constraint) { + if (!meetsConstraint(offer, constraint)) { + return false; + } + Map<String, Object> results = new HashMap<String, Object>(5); + //Assign default values to avoid NPE + results.put(RESOURCES_CPU_KEY, Double.valueOf(0.0)); + results.put(RESOURCES_MEM_KEY, Double.valueOf(0.0)); + results.put(RESOURCES_DISK_KEY, Double.valueOf(0.0)); + results.put(RESOURCES_PORTS_KEY, Integer.valueOf(0)); + + for (Resource resource : offer.getResourcesList()) { + if (resourceEvaluators.containsKey(resource.getName())) { + resourceEvaluators.get(resource.getName()).eval(resource, results); + } else { + LOGGER.warn("Ignoring unknown resource type: {}", resource.getName()); + } + } + double cpus = (Double) results.get(RESOURCES_CPU_KEY); + double mem = (Double) results.get(RESOURCES_MEM_KEY); + int ports = (Integer) results.get(RESOURCES_PORTS_KEY); + + checkResource(cpus <= 0, RESOURCES_CPU_KEY); + checkResource(mem <= 0, RESOURCES_MEM_KEY); + checkResource(ports <= 0, RESOURCES_PORTS_KEY); + + return checkAggregates(offer, taskToLaunch, ports, cpus, mem); + } + + private boolean checkAggregates(Offer offer, org.apache.myriad.state.NodeTask taskToLaunch, int ports, double cpus, double mem) { + final org.apache.myriad.scheduler.ServiceResourceProfile profile = taskToLaunch.getProfile(); + final String taskPrefix = taskToLaunch.getTaskPrefix(); + final double aggrCpu = profile.getAggregateCpu() + profile.getExecutorCpu(); + final double aggrMem = profile.getAggregateMemory() + profile.getExecutorMemory(); + final org.apache.myriad.scheduler.TaskConstraints taskConstraints = taskConstraintsManager.getConstraints(taskPrefix); + if (aggrCpu <= cpus && aggrMem <= mem && taskConstraints.portsCount() <= ports) { + return true; + } else { + LOGGER.info("Offer not sufficient for task with, cpu: {}, memory: {}, ports: {}", aggrCpu, aggrMem, ports); + return false; + } + } + + private boolean meetsConstraint(Offer offer, org.apache.myriad.scheduler.constraints.Constraint constraint) { + if (constraint != null) { + switch (constraint.getType()) { + case LIKE: { + org.apache.myriad.scheduler.constraints.LikeConstraint likeConstraint = (org.apache.myriad.scheduler.constraints.LikeConstraint) constraint; + if (likeConstraint.isConstraintOnHostName()) { + return likeConstraint.matchesHostName(offer.getHostname()); + } else { + return likeConstraint.matchesSlaveAttributes(offer.getAttributesList()); + } + } + default: + return false; + } + } + return true; + } + + private void checkResource(boolean fail, String resource) { + if (fail) { + LOGGER.info("No " + resource + " resources present"); + } + } + + private static Double scalarToDouble(Resource resource, String id) { + Double value = new Double(0.0); + if (resource.getType().equals(Value.Type.SCALAR)) { + value = new Double(resource.getScalar().getValue()); + } else { + LOGGER.error(id + " resource was not a scalar: {}", resource.getType().toString()); + } + return value; + } + + private interface EvalResources { + public void eval(Resource resource, Map<String, Object> results); + } + + private static Map<String, EvalResources> resourceEvaluators; + + static { + resourceEvaluators = new HashMap<String, EvalResources>(4); + resourceEvaluators.put(RESOURCES_CPU_KEY, new EvalResources() { + public void eval(Resource resource, Map<String, Object> results) { + results.put(RESOURCES_CPU_KEY, (Double) results.get(RESOURCES_CPU_KEY) + scalarToDouble(resource, RESOURCES_CPU_KEY)); + } + }); + resourceEvaluators.put(RESOURCES_MEM_KEY, new EvalResources() { + public void eval(Resource resource, Map<String, Object> results) { + results.put(RESOURCES_MEM_KEY, (Double) results.get(RESOURCES_MEM_KEY) + scalarToDouble(resource, RESOURCES_MEM_KEY)); + } + }); + resourceEvaluators.put(RESOURCES_DISK_KEY, new EvalResources() { + public void eval(Resource resource, Map<String, Object> results) { + } + }); + resourceEvaluators.put(RESOURCES_PORTS_KEY, new EvalResources() { + public void eval(Resource resource, Map<String, Object> results) { + int ports = 0; + if (resource.getType().equals(Value.Type.RANGES)) { + Value.Ranges ranges = resource.getRanges(); + for (Value.Range range : ranges.getRangeList()) { + if (range.getBegin() < range.getEnd()) { + ports += range.getEnd() - range.getBegin() + 1; + } + } + } else { + LOGGER.error("ports resource was not Ranges: {}", resource.getType().toString()); + + } + results.put(RESOURCES_PORTS_KEY, (Integer) results.get(RESOURCES_PORTS_KEY) + Integer.valueOf(ports)); + } + }); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/SlaveLostEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/SlaveLostEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/SlaveLostEventHandler.java new file mode 100644 index 0000000..6feebe3 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/SlaveLostEventHandler.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.event.handlers; + +import org.apache.myriad.scheduler.event.SlaveLostEvent; +import com.lmax.disruptor.EventHandler; +import org.apache.mesos.Protos.SlaveID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * handles and logs mesos slave lost events + */ +public class SlaveLostEventHandler implements EventHandler<SlaveLostEvent> { + private static final Logger LOGGER = LoggerFactory.getLogger(SlaveLostEventHandler.class); + + @Override + public void onEvent(SlaveLostEvent event, long sequence, boolean endOfBatch) throws Exception { + SlaveID slaveId = event.getSlaveId(); + LOGGER.info("Slave {} lost!", slaveId); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java new file mode 100644 index 0000000..29c89c3 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.event.handlers; + +import org.apache.myriad.scheduler.event.StatusUpdateEvent; +import org.apache.myriad.scheduler.fgs.OfferLifecycleManager; +import org.apache.myriad.state.NodeTask; +import org.apache.myriad.state.SchedulerState; +import com.lmax.disruptor.EventHandler; + +import javax.inject.Inject; + +import org.apache.mesos.Protos.TaskID; +import org.apache.mesos.Protos.TaskState; +import org.apache.mesos.Protos.TaskStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * handles and logs mesos status update events + */ +public class StatusUpdateEventHandler implements EventHandler<StatusUpdateEvent> { + + private static final Logger LOGGER = LoggerFactory.getLogger(StatusUpdateEventHandler.class); + + private final SchedulerState schedulerState; + private final OfferLifecycleManager offerLifecycleManager; + + @Inject + public StatusUpdateEventHandler(SchedulerState schedulerState, OfferLifecycleManager offerLifecycleManager) { + this.schedulerState = schedulerState; + this.offerLifecycleManager = offerLifecycleManager; + } + + @Override + public void onEvent(StatusUpdateEvent event, long sequence, boolean endOfBatch) throws Exception { + TaskStatus status = event.getStatus(); + this.schedulerState.updateTask(status); + TaskID taskId = status.getTaskId(); + NodeTask task = schedulerState.getTask(taskId); + if (task == null) { + LOGGER.warn("Task: {} not found, status: {}", taskId.getValue(), status.getState()); + schedulerState.removeTask(taskId); + return; + } + LOGGER.info("Status Update for task: {} | state: {}", taskId.getValue(), status.getState()); + TaskState state = status.getState(); + + switch (state) { + case TASK_STAGING: + schedulerState.makeTaskStaging(taskId); + break; + case TASK_STARTING: + schedulerState.makeTaskStaging(taskId); + break; + case TASK_RUNNING: + schedulerState.makeTaskActive(taskId); + break; + case TASK_FINISHED: + offerLifecycleManager.declineOutstandingOffers(task.getHostname()); + schedulerState.removeTask(taskId); + break; + case TASK_FAILED: + // Add to pending tasks + offerLifecycleManager.declineOutstandingOffers(task.getHostname()); + schedulerState.makeTaskPending(taskId); + break; + case TASK_KILLED: + offerLifecycleManager.declineOutstandingOffers(task.getHostname()); + schedulerState.removeTask(taskId); + break; + case TASK_LOST: + offerLifecycleManager.declineOutstandingOffers(task.getHostname()); + schedulerState.makeTaskPending(taskId); + break; + default: + LOGGER.error("Invalid state: {}", state); + break; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/ConsumedOffer.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/ConsumedOffer.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/ConsumedOffer.java new file mode 100644 index 0000000..ff49496 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/ConsumedOffer.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.fgs; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + +import org.apache.mesos.Protos; + +/** + * Represents offers from a slave that have been consumed by Myriad. + */ +public class ConsumedOffer { + private List<Protos.Offer> offers; + + public ConsumedOffer() { + this.offers = new LinkedList<>(); + } + + public void add(Protos.Offer offer) { + offers.add(offer); + } + + public List<Protos.Offer> getOffers() { + return offers; + } + + public Collection<Protos.OfferID> getOfferIds() { + Collection<Protos.OfferID> ids = new ArrayList<>(offers.size()); + + for (Protos.Offer offer : offers) { + ids.add(offer.getId()); + } + + return ids; + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java new file mode 100644 index 0000000..b8d8326 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.fgs; + +import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor; +import org.apache.myriad.state.SchedulerState; +import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; +import java.util.List; +import javax.inject.Inject; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.Offer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles node manager heartbeat. + */ +public class NMHeartBeatHandler extends BaseInterceptor { + @VisibleForTesting + Logger logger = LoggerFactory.getLogger(NMHeartBeatHandler.class); + + private final AbstractYarnScheduler yarnScheduler; + private final org.apache.myriad.scheduler.MyriadDriver myriadDriver; + private final YarnNodeCapacityManager yarnNodeCapacityMgr; + private final OfferLifecycleManager offerLifecycleMgr; + private final NodeStore nodeStore; + private final SchedulerState state; + + @Inject + public NMHeartBeatHandler(org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler, org.apache.myriad.scheduler.MyriadDriver myriadDriver, YarnNodeCapacityManager yarnNodeCapacityMgr, + OfferLifecycleManager offerLifecycleMgr, NodeStore nodeStore, SchedulerState state) { + + if (registry != null) { + registry.register(this); + } + + this.yarnScheduler = yarnScheduler; + this.myriadDriver = myriadDriver; + this.yarnNodeCapacityMgr = yarnNodeCapacityMgr; + this.offerLifecycleMgr = offerLifecycleMgr; + this.nodeStore = nodeStore; + this.state = state; + } + + @Override + public CallBackFilter getCallBackFilter() { + return new CallBackFilter() { + @Override + public boolean allowCallBacksForNode(NodeId nodeManager) { + return org.apache.myriad.scheduler.SchedulerUtils.isEligibleForFineGrainedScaling(nodeManager.getHost(), state); + } + }; + } + + @Override + public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) { + switch (event.getType()) { + case STARTED: + RMNode rmNode = context.getRMNodes().get(event.getNodeId()); + Resource totalCapability = rmNode.getTotalCapability(); + if (totalCapability.getMemory() != 0 || totalCapability.getVirtualCores() != 0) { + logger.warn("FineGrainedScaling feature got invoked for a " + "NM with non-zero capacity. Host: {}, Mem: {}, CPU: {}. Setting the NM's capacity to (0G,0CPU)", rmNode.getHostName(), totalCapability.getMemory(), totalCapability + .getVirtualCores()); + totalCapability.setMemory(0); + totalCapability.setVirtualCores(0); + } + break; + + case STATUS_UPDATE: + handleStatusUpdate(event, context); + break; + + default: + break; + } + } + + @VisibleForTesting + protected void handleStatusUpdate(RMNodeEvent event, RMContext context) { + if (!(event instanceof RMNodeStatusEvent)) { + logger.error("{} not an instance of {}", event.getClass().getName(), RMNodeStatusEvent.class.getName()); + return; + } + + RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event; + RMNode rmNode = context.getRMNodes().get(event.getNodeId()); + String hostName = rmNode.getNodeID().getHost(); + + Node host = nodeStore.getNode(hostName); + if (host != null) { + host.snapshotRunningContainers(); + } + + // New capacity of the node = + // resources under use on the node (due to previous offers) + + // new resources offered by mesos for the node + yarnNodeCapacityMgr.setNodeCapacity(rmNode, Resources.add(getResourcesUnderUse(statusEvent), getNewResourcesOfferedByMesos(hostName))); + } + + private Resource getNewResourcesOfferedByMesos(String hostname) { + OfferFeed feed = offerLifecycleMgr.getOfferFeed(hostname); + if (feed == null) { + logger.debug("No offer feed for: {}", hostname); + return Resource.newInstance(0, 0); + } + List<Offer> offers = new ArrayList<>(); + Protos.Offer offer; + while ((offer = feed.poll()) != null) { + offers.add(offer); + offerLifecycleMgr.markAsConsumed(offer); + } + Resource fromMesosOffers = OfferUtils.getYarnResourcesFromMesosOffers(offers); + + if (logger.isDebugEnabled()) { + logger.debug("NM on host {} got {} CPUs and {} memory from mesos", hostname, fromMesosOffers.getVirtualCores(), fromMesosOffers.getMemory()); + } + + return fromMesosOffers; + } + + private Resource getResourcesUnderUse(RMNodeStatusEvent statusEvent) { + Resource usedResources = Resource.newInstance(0, 0); + for (ContainerStatus status : statusEvent.getContainers()) { + if (status.getState() == ContainerState.NEW || status.getState() == ContainerState.RUNNING) { + RMContainer rmContainer = yarnScheduler.getRMContainer(status.getContainerId()); + // (sdaingade) This check is needed as RMContainer information may not be populated + // immediately after a RM restart. + if (rmContainer != null) { + Resources.addTo(usedResources, rmContainer.getAllocatedResource()); + } + } + } + return usedResources; + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/Node.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/Node.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/Node.java new file mode 100644 index 0000000..035953e --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/Node.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.fgs; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.mesos.Protos; + +/** + * Abstraction that encapsulates YARN and Mesos view of a node. + */ +public class Node { + /** + * Mesos slave id associated with this node. + */ + private Protos.SlaveID slaveId; + + /** + * Mesos executor on this node. + */ + private Protos.ExecutorInfo execInfo; + + /** + * YARN scheduler's representation of this node. + */ + private SchedulerNode node; + + /** + * Snapshot of containers allocated by YARN scheduler. + * This need not reflect the current state. It is meant to be used by the + * Myriad scheduler. + */ + private Set<RMContainer> containerSnapshot; + + public Node(SchedulerNode node) { + this.node = node; + } + + public SchedulerNode getNode() { + return node; + } + + public Protos.SlaveID getSlaveId() { + return slaveId; + } + + public void setSlaveId(Protos.SlaveID slaveId) { + this.slaveId = slaveId; + } + + public Protos.ExecutorInfo getExecInfo() { + return execInfo; + } + + public void setExecInfo(Protos.ExecutorInfo execInfo) { + this.execInfo = execInfo; + } + + public void snapshotRunningContainers() { + this.containerSnapshot = new HashSet<>(node.getRunningContainers()); + } + + public void removeContainerSnapshot() { + this.containerSnapshot = null; + } + + public Set<RMContainer> getContainerSnapshot() { + return this.containerSnapshot; + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NodeStore.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NodeStore.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NodeStore.java new file mode 100644 index 0000000..a940a0e --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NodeStore.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.fgs; + +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +/** + * A store for all Node instances managed by this Myriad instance. + */ +public class NodeStore { + private ConcurrentHashMap<String, Node> nodeMap; + + public NodeStore() { + nodeMap = new ConcurrentHashMap<>(200, 0.75f, 50); + } + + private String getKey(SchedulerNode schedNode) { + return schedNode.getNodeID().getHost(); + } + + public void add(SchedulerNode schedNode) { + nodeMap.put(getKey(schedNode), new Node(schedNode)); + } + + public void remove(String hostname) { + nodeMap.remove(hostname); + } + + public Node getNode(String hostname) { + return nodeMap.get(hostname); + } + + public boolean isPresent(String hostname) { + return nodeMap.containsKey(hostname); + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferFeed.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferFeed.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferFeed.java new file mode 100644 index 0000000..521ea57 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferFeed.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.fgs; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.mesos.Protos; + +/** + * Feed of Mesos offers for a node. + */ +public class OfferFeed { + private ConcurrentLinkedQueue<Protos.Offer> queue; + + public OfferFeed() { + this.queue = new ConcurrentLinkedQueue<>(); + } + + public void add(Protos.Offer offer) { + queue.add(offer); + } + + /** + * Retrieves and removes the head of the feed, or returns NULL if the feed is + * empty. + */ + public Protos.Offer poll() { + return queue.poll(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java new file mode 100644 index 0000000..0e283cb --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.fgs; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import javax.inject.Inject; + +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.Offer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages the Mesos offers tracked by Myriad. + */ +public class OfferLifecycleManager { + private static final Logger LOGGER = LoggerFactory.getLogger(OfferLifecycleManager.class); + + private Map<String, OfferFeed> offerFeedMap; + + /** + * !!! Not thread safe !!! + */ + private final Map<String, ConsumedOffer> consumedOfferMap; + + private final NodeStore nodeStore; + private final org.apache.myriad.scheduler.MyriadDriver myriadDriver; + + @Inject + public OfferLifecycleManager(NodeStore nodeStore, org.apache.myriad.scheduler.MyriadDriver myriadDriver) { + + this.offerFeedMap = new ConcurrentHashMap<>(200, 0.75f, 50); + this.consumedOfferMap = new HashMap<>(200, 0.75f); + this.nodeStore = nodeStore; + this.myriadDriver = myriadDriver; + } + + public OfferFeed getOfferFeed(String hostname) { + return offerFeedMap.get(hostname); + } + + public void declineOffer(Protos.Offer offer) { + myriadDriver.getDriver().declineOffer(offer.getId()); + LOGGER.debug("Declined offer {}", offer.getId()); + } + + public void addOffers(Protos.Offer... offers) { + for (Protos.Offer offer : offers) { + String hostname = offer.getHostname(); + Node node = nodeStore.getNode(hostname); + if (node != null) { + OfferFeed feed = offerFeedMap.get(hostname); + if (feed == null) { + feed = new OfferFeed(); + offerFeedMap.put(hostname, feed); + } + feed.add(offer); + + node.setSlaveId(offer.getSlaveId()); + + LOGGER.debug("addResourceOffers: caching offer for host {}, offer id {}", hostname, offer.getId().getValue()); + } else { + myriadDriver.getDriver().declineOffer(offer.getId()); + LOGGER.debug("Declined offer for unregistered host {}", hostname); + } + } + } + + public void markAsConsumed(Protos.Offer offer) { + ConsumedOffer consumedOffer = consumedOfferMap.get(offer.getHostname()); + if (consumedOffer == null) { + consumedOffer = new ConsumedOffer(); + consumedOfferMap.put(offer.getHostname(), consumedOffer); + } + + consumedOffer.add(offer); + } + + public ConsumedOffer drainConsumedOffer(String hostname) { + return consumedOfferMap.remove(hostname); + } + + public void declineOutstandingOffers(String hostname) { + int numOutStandingOffers = 0; + OfferFeed offerFeed = getOfferFeed(hostname); + Offer offer; + while (offerFeed != null && (offer = offerFeed.poll()) != null) { + declineOffer(offer); + numOutStandingOffers++; + } + if (numOutStandingOffers > 0) { + LOGGER.info("Declined {} outstanding offers for host {}", numOutStandingOffers, hostname); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferUtils.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferUtils.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferUtils.java new file mode 100644 index 0000000..038aeb4 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferUtils.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.fgs; + +import java.util.Collection; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.Offer; + +/** + * Utility class that provides useful methods that deal with Mesos offers. + */ +public class OfferUtils { + + /** + * Transforms a collection of mesos offers into {@link Resource}. + * + * @param offers collection of mesos offers + * @return a single resource object equivalent to the cumulative sum of mesos offers + */ + public static Resource getYarnResourcesFromMesosOffers(Collection<Offer> offers) { + double cpus = 0.0; + double mem = 0.0; + + for (Protos.Offer offer : offers) { + for (Protos.Resource resource : offer.getResourcesList()) { + if (resource.getName().equalsIgnoreCase("cpus")) { + cpus += resource.getScalar().getValue(); + } else if (resource.getName().equalsIgnoreCase("mem")) { + mem += resource.getScalar().getValue(); + } + } + } + return Resource.newInstance((int) mem, (int) cpus); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java new file mode 100644 index 0000000..ca094cc --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.fgs; + +import org.apache.myriad.executor.ContainerTaskStatusRequest; +import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import javax.inject.Inject; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages the capacity exposed by NodeManager. It uses the offers available + * from Mesos to inflate the node capacity and lets ResourceManager make the + * scheduling decision. After the scheduling decision is done, there are 2 cases: + * <p/> + * 1. If ResourceManager did not use the expanded capacity, then the node's + * capacity is reverted back to original value and the offer is declined. + * 2. If ResourceManager ended up using the expanded capacity, then the node's + * capacity is updated accordingly and any unused capacity is returned back to + * Mesos. + */ +public class YarnNodeCapacityManager extends BaseInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(YarnNodeCapacityManager.class); + + private final AbstractYarnScheduler yarnScheduler; + private final RMContext rmContext; + private final org.apache.myriad.scheduler.MyriadDriver myriadDriver; + private final OfferLifecycleManager offerLifecycleMgr; + private final NodeStore nodeStore; + private final org.apache.myriad.state.SchedulerState state; + + @Inject + public YarnNodeCapacityManager(org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler, RMContext rmContext, org.apache.myriad.scheduler.MyriadDriver myriadDriver, OfferLifecycleManager + offerLifecycleMgr, NodeStore nodeStore, org.apache.myriad.state.SchedulerState state) { + if (registry != null) { + registry.register(this); + } + this.yarnScheduler = yarnScheduler; + this.rmContext = rmContext; + this.myriadDriver = myriadDriver; + this.offerLifecycleMgr = offerLifecycleMgr; + this.nodeStore = nodeStore; + this.state = state; + } + + @Override + public CallBackFilter getCallBackFilter() { + return new CallBackFilter() { + @Override + public boolean allowCallBacksForNode(NodeId nodeManager) { + return org.apache.myriad.scheduler.SchedulerUtils.isEligibleForFineGrainedScaling(nodeManager.getHost(), state); + } + }; + } + + @Override + public void afterSchedulerEventHandled(SchedulerEvent event) { + switch (event.getType()) { + case NODE_ADDED: { + if (!(event instanceof NodeAddedSchedulerEvent)) { + LOGGER.error("{} not an instance of {}", event.getClass().getName(), NodeAddedSchedulerEvent.class.getName()); + return; + } + + NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event; + NodeId nodeId = nodeAddedEvent.getAddedRMNode().getNodeID(); + String host = nodeId.getHost(); + + SchedulerNode node = yarnScheduler.getSchedulerNode(nodeId); + nodeStore.add(node); + LOGGER.info("afterSchedulerEventHandled: NM registration from node {}", host); + } + break; + + case NODE_UPDATE: { + if (!(event instanceof NodeUpdateSchedulerEvent)) { + LOGGER.error("{} not an instance of {}", event.getClass().getName(), NodeUpdateSchedulerEvent.class.getName()); + return; + } + + RMNode rmNode = ((NodeUpdateSchedulerEvent) event).getRMNode(); + handleContainerAllocation(rmNode); + } + break; + + default: + break; + } + } + + /** + * Checks if any containers were allocated in the current scheduler run and + * launches the corresponding Mesos tasks. It also udpates the node + * capacity depending on what portion of the consumed offers were actually + * used. + */ + @VisibleForTesting + protected void handleContainerAllocation(RMNode rmNode) { + String host = rmNode.getNodeID().getHost(); + + ConsumedOffer consumedOffer = offerLifecycleMgr.drainConsumedOffer(host); + if (consumedOffer == null) { + LOGGER.debug("No offer consumed for {}", host); + return; + } + + Node node = nodeStore.getNode(host); + Set<RMContainer> containersBeforeSched = node.getContainerSnapshot(); + Set<RMContainer> containersAfterSched = new HashSet<>(node.getNode().getRunningContainers()); + + Set<RMContainer> containersAllocatedByMesosOffer = (containersBeforeSched == null) ? containersAfterSched : Sets.difference(containersAfterSched, containersBeforeSched); + + if (containersAllocatedByMesosOffer.isEmpty()) { + LOGGER.debug("No containers allocated using Mesos offers for host: {}", host); + for (Protos.Offer offer : consumedOffer.getOffers()) { + offerLifecycleMgr.declineOffer(offer); + } + setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers()))); + } else { + LOGGER.debug("Containers allocated using Mesos offers for host: {} count: {}", host, containersAllocatedByMesosOffer.size()); + + // Identify the Mesos tasks that need to be launched + List<Protos.TaskInfo> tasks = Lists.newArrayList(); + Resource resUsed = Resource.newInstance(0, 0); + + for (RMContainer newContainer : containersAllocatedByMesosOffer) { + tasks.add(getTaskInfoForContainer(newContainer, consumedOffer, node)); + resUsed = Resources.add(resUsed, newContainer.getAllocatedResource()); + } + + // Reduce node capacity to account for unused offers + Resource resOffered = OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers()); + Resource resUnused = Resources.subtract(resOffered, resUsed); + setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), resUnused)); + + myriadDriver.getDriver().launchTasks(consumedOffer.getOfferIds(), tasks); + } + + // No need to hold on to the snapshot anymore + node.removeContainerSnapshot(); + } + + /** + * 1. Updates {@link RMNode#getTotalCapability()} with newCapacity. + * 2. Sends out a {@link NodeResourceUpdateSchedulerEvent} that's handled by YARN's scheduler. + * The scheduler updates the corresponding {@link SchedulerNode} with the newCapacity. + * + * @param rmNode + * @param newCapacity + */ + @SuppressWarnings("unchecked") + public void setNodeCapacity(RMNode rmNode, Resource newCapacity) { + rmNode.getTotalCapability().setMemory(newCapacity.getMemory()); + rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores()); + LOGGER.debug("Setting capacity for node {} to {}", rmNode.getHostName(), newCapacity); + // updates the scheduler with the new capacity for the NM. + // the event is handled by the scheduler asynchronously + rmContext.getDispatcher().getEventHandler().handle(new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption.newInstance(rmNode.getTotalCapability(), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT))); + } + + private Protos.TaskInfo getTaskInfoForContainer(RMContainer rmContainer, ConsumedOffer consumedOffer, Node node) { + + Protos.Offer offer = consumedOffer.getOffers().get(0); + Container container = rmContainer.getContainer(); + Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(ContainerTaskStatusRequest.YARN_CONTAINER_TASK_ID_PREFIX + container.getId().toString()).build(); + + // TODO (sdaingade) Remove ExecutorInfo from the Node object + // as this is now cached in the NodeTask object in scheduler state. + Protos.ExecutorInfo executorInfo = node.getExecInfo(); + if (executorInfo == null) { + executorInfo = Protos.ExecutorInfo.newBuilder(state.getNodeTask(offer.getSlaveId(), org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX).getExecutorInfo()).setFrameworkId(offer.getFrameworkId()).build(); + node.setExecInfo(executorInfo); + } + + return Protos.TaskInfo.newBuilder().setName("task_" + taskId.getValue()).setTaskId(taskId).setSlaveId(offer.getSlaveId()).addResources(Protos.Resource.newBuilder().setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar + .newBuilder().setValue(container.getResource().getVirtualCores()))).addResources(Protos.Resource.newBuilder().setName("mem").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(container.getResource() + .getMemory()))).setExecutor(executorInfo).build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadCapacityScheduler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadCapacityScheduler.java new file mode 100644 index 0000000..f6d24e9 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadCapacityScheduler.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.yarn; + +import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor; +import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; + +/** + * {@link MyriadCapacityScheduler} just extends YARN's {@link CapacityScheduler} and + * allows some of the {@link CapacityScheduler} methods to be intercepted + * via the {@link YarnSchedulerInterceptor} interface. + */ +public class MyriadCapacityScheduler extends CapacityScheduler { + private Configuration conf; + + private RMContext rmContext; + private YarnSchedulerInterceptor yarnSchedulerInterceptor; + private RMNodeEventHandler rmNodeEventHandler; + + public MyriadCapacityScheduler() { + super(); + } + + /** + * Register an event handler that receives {@link RMNodeEvent} events. + * This event handler is registered ahead of RM's own event handler for RMNodeEvents. + * For e.g. myriad can inspect a node's HB (RMNodeStatusEvent) before the HB is handled by + * RM and the scheduler. + * + * @param rmContext + */ + @Override + public synchronized void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + this.yarnSchedulerInterceptor = new CompositeInterceptor(); + rmNodeEventHandler = new RMNodeEventHandler(yarnSchedulerInterceptor, rmContext); + rmContext.getDispatcher().register(RMNodeEventType.class, rmNodeEventHandler); + super.setRMContext(rmContext); + } + + /** + * ******** Methods overridden from YARN {@link CapacityScheduler} ********************* + */ + + @Override + public synchronized void serviceInit(Configuration conf) throws Exception { + this.conf = conf; + super.serviceInit(conf); + } + + @Override + public synchronized void serviceStart() throws Exception { + this.yarnSchedulerInterceptor.init(conf, this, rmContext); + super.serviceStart(); + } + + @Override + public synchronized void handle(SchedulerEvent event) { + this.yarnSchedulerInterceptor.beforeSchedulerEventHandled(event); + super.handle(event); + this.yarnSchedulerInterceptor.afterSchedulerEventHandled(event); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java new file mode 100644 index 0000000..35d6aaf --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.yarn; + +import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor; +import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; + +/** + * {@link MyriadFairScheduler} just extends YARN's {@link FairScheduler} and + * allows some of the {@link FairScheduler} methods to be intercepted + * via the {@link YarnSchedulerInterceptor} interface. + */ +public class MyriadFairScheduler extends FairScheduler { + + private RMContext rmContext; + private YarnSchedulerInterceptor yarnSchedulerInterceptor; + private RMNodeEventHandler rmNodeEventHandler; + private Configuration conf; + + public MyriadFairScheduler() { + super(); + } + + /** + * Register an event handler that receives {@link RMNodeEvent} events. + * This event handler is registered ahead of RM's own event handler for RMNodeEvents. + * For e.g. myriad can inspect a node's HB (RMNodeStatusEvent) before the HB is handled by + * RM and the scheduler. + * + * @param rmContext + */ + @Override + public synchronized void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + this.yarnSchedulerInterceptor = new CompositeInterceptor(); + rmNodeEventHandler = new RMNodeEventHandler(yarnSchedulerInterceptor, rmContext); + rmContext.getDispatcher().register(RMNodeEventType.class, rmNodeEventHandler); + super.setRMContext(rmContext); + } + + /** + * ******** Methods overridden from YARN {@link FairScheduler} ********************* + */ + + @Override + public synchronized void serviceInit(Configuration conf) throws Exception { + this.conf = conf; + super.serviceInit(conf); + } + + @Override + public synchronized void serviceStart() throws Exception { + this.yarnSchedulerInterceptor.init(conf, this, rmContext); + super.serviceStart(); + } + + @Override + public synchronized void handle(SchedulerEvent event) { + this.yarnSchedulerInterceptor.beforeSchedulerEventHandled(event); + super.handle(event); + this.yarnSchedulerInterceptor.afterSchedulerEventHandled(event); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFifoScheduler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFifoScheduler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFifoScheduler.java new file mode 100644 index 0000000..1fd3b87 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFifoScheduler.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.yarn; + +import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; + +/** + * {@link MyriadFifoScheduler} just extends YARN's {@link FifoScheduler} and + * allows some of the {@link FifoScheduler} methods to be intercepted + * via the {@link YarnSchedulerInterceptor} interface. + */ +public class MyriadFifoScheduler extends FifoScheduler { + private Configuration conf; + + private RMContext rmContext; + private YarnSchedulerInterceptor yarnSchedulerInterceptor; + private RMNodeEventHandler rmNodeEventHandler; + + public MyriadFifoScheduler() { + super(); + } + + /** + * Register an event handler that receives {@link RMNodeEvent} events. + * This event handler is registered ahead of RM's own event handler for RMNodeEvents. + * For e.g. myriad can inspect a node's HB (RMNodeStatusEvent) before the HB is handled by + * RM and the scheduler. + * + * @param rmContext + */ + @Override + public synchronized void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + this.yarnSchedulerInterceptor = new org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor(); + rmNodeEventHandler = new RMNodeEventHandler(yarnSchedulerInterceptor, rmContext); + rmContext.getDispatcher().register(RMNodeEventType.class, rmNodeEventHandler); + super.setRMContext(rmContext); + } + + /** + * ******** Methods overridden from YARN {@link FifoScheduler} ********************* + */ + + @Override + public synchronized void serviceInit(Configuration conf) throws Exception { + this.conf = conf; + super.serviceInit(conf); + } + + @Override + public synchronized void serviceStart() throws Exception { + this.yarnSchedulerInterceptor.init(conf, this, rmContext); + super.serviceStart(); + } + + @Override + public synchronized void handle(SchedulerEvent event) { + this.yarnSchedulerInterceptor.beforeSchedulerEventHandled(event); + super.handle(event); + this.yarnSchedulerInterceptor.afterSchedulerEventHandled(event); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/RMNodeEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/RMNodeEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/RMNodeEventHandler.java new file mode 100644 index 0000000..1a63553 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/RMNodeEventHandler.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.yarn; + +import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; + +/** + * Passes the {@link RMNodeEvent} events into the {@link YarnSchedulerInterceptor}. + */ +public class RMNodeEventHandler implements EventHandler<RMNodeEvent> { + private final YarnSchedulerInterceptor interceptor; + private final RMContext rmContext; + + public RMNodeEventHandler(YarnSchedulerInterceptor interceptor, RMContext rmContext) { + this.interceptor = interceptor; + this.rmContext = rmContext; + } + + @Override + public void handle(RMNodeEvent event) { + interceptor.beforeRMNodeEventHandled(event, rmContext); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/BaseInterceptor.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/BaseInterceptor.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/BaseInterceptor.java new file mode 100644 index 0000000..ba4dec3 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/BaseInterceptor.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.yarn.interceptor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; + +import java.io.IOException; + +/** + * A no-op interceptor whose sole purpose is to serve as a base class + * for other interceptors. Child interceptors can selectively override the + * required methods. + */ +public class BaseInterceptor implements YarnSchedulerInterceptor { + // restrict the constructor + protected BaseInterceptor() { + } + + @Override + public CallBackFilter getCallBackFilter() { + return new CallBackFilter() { + @Override + public boolean allowCallBacksForNode(NodeId nodeManager) { + return true; + } + }; + } + + @Override + public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException { + } + + @Override + public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) { + + } + + @Override + public void beforeSchedulerEventHandled(SchedulerEvent event) { + + } + + @Override + public void afterSchedulerEventHandled(SchedulerEvent event) { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java new file mode 100644 index 0000000..0bf1616 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.yarn.interceptor; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; + +/** + * An interceptor that wraps other interceptors. The Myriad{Fair,Capacity,Fifo}Scheduler classes + * instantiate this class and allow interception of the Yarn scheduler events/method calls. + * <p/> + * The {@link CompositeInterceptor} allows other interceptors to be registered via {@link InterceptorRegistry} + * and passes control to the registered interceptors whenever a event/method call is being intercepted. + */ +public class CompositeInterceptor implements YarnSchedulerInterceptor, InterceptorRegistry { + private static final Logger LOGGER = LoggerFactory.getLogger(CompositeInterceptor.class); + + private Map<Class<?>, YarnSchedulerInterceptor> interceptors = Maps.newLinkedHashMap(); + private YarnSchedulerInterceptor myriadInitInterceptor; + + /** + * Called by Myriad{Fair,Capacity,Fifo}Scheduler classes. Creates an instance of + * {@link MyriadInitializationInterceptor}. + */ + public CompositeInterceptor() { + this.myriadInitInterceptor = new MyriadInitializationInterceptor(this); + } + + @VisibleForTesting + public void setMyriadInitInterceptor(YarnSchedulerInterceptor myriadInitInterceptor) { + this.myriadInitInterceptor = myriadInitInterceptor; + } + + @Override + public void register(YarnSchedulerInterceptor interceptor) { + interceptors.put(interceptor.getClass(), interceptor); + LOGGER.info("Registered {} into the registry.", interceptor.getClass().getName()); + } + + @Override + public CallBackFilter getCallBackFilter() { + return new CallBackFilter() { + @Override + public boolean allowCallBacksForNode(NodeId nodeManager) { + return true; + } + }; + } + + /** + * Allows myriad to be initialized via {@link #myriadInitInterceptor}. After myriad is initialized, + * other interceptors will later register with this class via + * {@link InterceptorRegistry#register(YarnSchedulerInterceptor)}. + * + * @param conf + * @param yarnScheduler + * @param rmContext + * @throws IOException + */ + @Override + public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException { + myriadInitInterceptor.init(conf, yarnScheduler, rmContext); + } + + @Override + public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) { + for (YarnSchedulerInterceptor interceptor : interceptors.values()) { + if (interceptor.getCallBackFilter().allowCallBacksForNode(event.getNodeId())) { + interceptor.beforeRMNodeEventHandled(event, context); + } + } + } + + @Override + public void beforeSchedulerEventHandled(SchedulerEvent event) { + for (YarnSchedulerInterceptor interceptor : interceptors.values()) { + final NodeId nodeId = getNodeIdForSchedulerEvent(event); + if (nodeId != null && interceptor.getCallBackFilter().allowCallBacksForNode(nodeId)) { + interceptor.beforeSchedulerEventHandled(event); + } + } + } + + @Override + public void afterSchedulerEventHandled(SchedulerEvent event) { + for (YarnSchedulerInterceptor interceptor : interceptors.values()) { + NodeId nodeId = getNodeIdForSchedulerEvent(event); + if (nodeId != null && interceptor.getCallBackFilter().allowCallBacksForNode(nodeId)) { + interceptor.afterSchedulerEventHandled(event); + } + } + } + + private NodeId getNodeIdForSchedulerEvent(SchedulerEvent event) { + switch (event.getType()) { + case NODE_ADDED: + return ((NodeAddedSchedulerEvent) event).getAddedRMNode().getNodeID(); + case NODE_REMOVED: + return ((NodeRemovedSchedulerEvent) event).getRemovedRMNode().getNodeID(); + case NODE_UPDATE: + return ((NodeUpdateSchedulerEvent) event).getRMNode().getNodeID(); + case NODE_RESOURCE_UPDATE: + return ((NodeResourceUpdateSchedulerEvent) event).getRMNode().getNodeID(); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java new file mode 100644 index 0000000..e4073d3 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.yarn.interceptor; + +/** + * Allows registration of {@link YarnSchedulerInterceptor}. + */ +public interface InterceptorRegistry { + + public void register(YarnSchedulerInterceptor interceptor); + +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java new file mode 100644 index 0000000..fa3e690 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.yarn.interceptor; + +import org.apache.myriad.Main; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Responsible for intializing myriad. + */ +public class MyriadInitializationInterceptor extends BaseInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(MyriadInitializationInterceptor.class); + + private final InterceptorRegistry registry; + + public MyriadInitializationInterceptor(InterceptorRegistry registry) { + this.registry = registry; + } + + /** + * Initialize Myriad plugin before RM's scheduler is initialized. + * This includes registration with Mesos master, initialization of + * the myriad web application, initializing guice modules etc. + */ + @Override + public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException { + try { + Main.initialize(conf, yarnScheduler, rmContext, registry); + } catch (Exception e) { + // Abort bringing up RM + throw new RuntimeException("Failed to initialize myriad", e); + } + LOGGER.info("Initialized myriad."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java new file mode 100644 index 0000000..92a4b0f --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.yarn.interceptor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; + +import java.io.IOException; + +/** + * Allows interception of YARN's scheduler events (or methods). + */ +public interface YarnSchedulerInterceptor { + + /** + * Filters the method callbacks. + */ + interface CallBackFilter { + /** + * Method to determine if any other methods in {@link YarnSchedulerInterceptor} + * pertaining to a given node manager should be invoked or not. + * + * @param nodeManager NodeId of the Node Manager registered with RM. + * @return true to allow invoking further interceptor methods. false otherwise. + */ + public boolean allowCallBacksForNode(NodeId nodeManager); + } + + /** + * Return an instance of {@link CallBackFilter}. {@link CallBackFilter#allowCallBacksForNode(NodeId)} + * method is invoked to *determine* if any of the other methods pertaining to a specific node + * needs to be invoked or not. + * + * @return + */ + public CallBackFilter getCallBackFilter(); + + /** + * Invoked *before* {@link AbstractYarnScheduler#reinitialize(Configuration, RMContext)} + * + * @param conf + * @param yarnScheduler + * @param rmContext + * @throws IOException + */ + public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException; + + /** + * Invoked *before* {@link RMNodeImpl#handle(RMNodeEvent)} only if + * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true. + * + * @param event + * @param context + */ + public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context); + + /** + * Invoked *before* {@link YarnScheduler#handle(org.apache.hadoop.yarn.event.Event)} only if + * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true. + * + * @param event + */ + public void beforeSchedulerEventHandled(SchedulerEvent event); + + /** + * Invoked *after* {@link YarnScheduler#handle(org.apache.hadoop.yarn.event.Event)} only if + * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true. + * + * @param event + */ + public void afterSchedulerEventHandled(SchedulerEvent event); + +}