[
https://issues.apache.org/jira/browse/STORM-893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14946164#comment-14946164
]
ASF GitHub Bot commented on STORM-893:
--------------------------------------
Github user jerrypeng commented on a diff in the pull request:
https://github.com/apache/storm/pull/746#discussion_r41347393
--- Diff:
storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
---
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.Component;
+import backtype.storm.scheduler.resource.RAS_Node;
+
+public class ResourceAwareStrategy implements IStrategy {
+ private Logger LOG = null;
+ private Topologies _topologies;
+ private Cluster _cluster;
+ //Map key is the supervisor id and the value is the corresponding
RAS_Node Object
+ private Map<String, RAS_Node> _availNodes;
+ private RAS_Node refNode = null;
+ /**
+ * supervisor id -> Node
+ */
+ private Map<String, RAS_Node> _nodes;
+ private Map<String, List<String>> _clusterInfo;
+
+ private final double CPU_WEIGHT = 1.0;
+ private final double MEM_WEIGHT = 1.0;
+ private final double NETWORK_WEIGHT = 1.0;
+
+ public ResourceAwareStrategy(Cluster cluster, Topologies topologies) {
+ _topologies = topologies;
+ _cluster = cluster;
+ _nodes = RAS_Node.getAllNodesFrom(cluster, _topologies);
+ _availNodes = this.getAvailNodes();
+ this.LOG = LoggerFactory.getLogger(this.getClass());
+ _clusterInfo = cluster.getNetworkTopography();
+ LOG.debug(this.getClusterInfo());
+ }
+
+ //the returned TreeMap keeps the Components sorted
+ private TreeMap<Integer, List<ExecutorDetails>>
getPriorityToExecutorDetailsListMap(
+ Queue<Component> ordered__Component_list,
Collection<ExecutorDetails> unassignedExecutors) {
+ TreeMap<Integer, List<ExecutorDetails>> retMap = new
TreeMap<Integer, List<ExecutorDetails>>();
+ Integer rank = 0;
+ for (Component ras_comp : ordered__Component_list) {
+ retMap.put(rank, new ArrayList<ExecutorDetails>());
+ for(ExecutorDetails exec : ras_comp.execs) {
+ if(unassignedExecutors.contains(exec)) {
+ retMap.get(rank).add(exec);
+ }
+ }
+ rank++;
+ }
+ return retMap;
+ }
+
+ public Map<WorkerSlot, Collection<ExecutorDetails>>
schedule(TopologyDetails td) {
+ if (_availNodes.size() <= 0) {
+ LOG.warn("No available nodes to schedule tasks on!");
+ return null;
+ }
+ Collection<ExecutorDetails> unassignedExecutors =
_cluster.getUnassignedExecutors(td);
+ Map<WorkerSlot, Collection<ExecutorDetails>>
schedulerAssignmentMap = new HashMap<WorkerSlot, Collection<ExecutorDetails>>();
+ LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
+ Collection<ExecutorDetails> scheduledTasks = new
ArrayList<ExecutorDetails>();
+ List<Component> spouts = this.getSpouts(_topologies, td);
+
+ if (spouts.size() == 0) {
+ LOG.error("Cannot find a Spout!");
+ return null;
+ }
+
+ Queue<Component> ordered__Component_list = bfs(_topologies, td,
spouts);
+
+ Map<Integer, List<ExecutorDetails>> priorityToExecutorMap =
getPriorityToExecutorDetailsListMap(ordered__Component_list,
unassignedExecutors);
+ Collection<ExecutorDetails> executorsNotScheduled = new
HashSet<ExecutorDetails>(unassignedExecutors);
+ Integer longestPriorityListSize =
this.getLongestPriorityListSize(priorityToExecutorMap);
+ //Pick the first executor with priority one, then the 1st exec
with priority 2, so on an so forth.
+ //Once we reach the last priority, we go back to priority 1 and
schedule the second task with priority 1.
+ for (int i = 0; i < longestPriorityListSize; i++) {
+ for (Entry<Integer, List<ExecutorDetails>> entry :
priorityToExecutorMap.entrySet()) {
+ Iterator<ExecutorDetails> it = entry.getValue().iterator();
+ if (it.hasNext()) {
+ ExecutorDetails exec = it.next();
+ LOG.debug("\n\nAttempting to schedule: {} of component
{}[avail {}] with rank {}",
+ new Object[] { exec,
td.getExecutorToComponent().get(exec),
+ td.getTaskResourceReqList(exec), entry.getKey() });
+ WorkerSlot targetSlot = this.findWorkerForExec(exec,
td, schedulerAssignmentMap);
+ if (targetSlot != null) {
+ RAS_Node targetNode =
this.idToNode(targetSlot.getNodeId());
+
if(!schedulerAssignmentMap.containsKey(targetSlot)) {
+ schedulerAssignmentMap.put(targetSlot, new
LinkedList<ExecutorDetails>());
+ }
+
+ schedulerAssignmentMap.get(targetSlot).add(exec);
+ targetNode.consumeResourcesforTask(exec, td);
+ scheduledTasks.add(exec);
+ LOG.debug("TASK {} assigned to Node: {} avail
[mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
+ targetNode,
targetNode.getAvailableMemoryResources(),
+ targetNode.getAvailableCpuResources(),
targetNode.getTotalMemoryResources(),
+ targetNode.getTotalCpuResources(),
targetSlot);
+ } else {
+ LOG.error("Not Enough Resources to schedule Task
{}", exec);
+ }
+ it.remove();
+ }
+ }
+ }
+
+ executorsNotScheduled.removeAll(scheduledTasks);
+ LOG.debug("/* Scheduling left over task (most likely sys tasks)
*/");
+ // schedule left over system tasks
+ for (ExecutorDetails exec : executorsNotScheduled) {
+ WorkerSlot targetSlot = this.findWorkerForExec(exec, td,
schedulerAssignmentMap);
+ if (targetSlot != null) {
+ RAS_Node targetNode =
this.idToNode(targetSlot.getNodeId());
+ if(schedulerAssignmentMap.containsKey(targetSlot) ==
false) {
+ schedulerAssignmentMap.put(targetSlot, new
LinkedList<ExecutorDetails>());
+ }
+
+ schedulerAssignmentMap.get(targetSlot).add(exec);
+ targetNode.consumeResourcesforTask(exec, td);
+ scheduledTasks.add(exec);
+ LOG.debug("TASK {} assigned to Node: {} avail [mem: {}
cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
+ targetNode,
targetNode.getAvailableMemoryResources(),
+ targetNode.getAvailableCpuResources(),
targetNode.getTotalMemoryResources(),
+ targetNode.getTotalCpuResources(), targetSlot);
+ } else {
+ LOG.error("Not Enough Resources to schedule Task {}",
exec);
+ }
+ }
+ executorsNotScheduled.removeAll(scheduledTasks);
+ if (executorsNotScheduled.size() > 0) {
+ LOG.error("Not all executors successfully scheduled: {}",
+ executorsNotScheduled);
+ schedulerAssignmentMap = null;
+ } else {
+ LOG.debug("All resources successfully scheduled!");
+ }
+ if (schedulerAssignmentMap == null) {
+ LOG.error("Topology {} not successfully scheduled!",
td.getId());
+ }
+ return schedulerAssignmentMap;
+ }
+
+ private WorkerSlot findWorkerForExec(ExecutorDetails exec,
TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>>
scheduleAssignmentMap) {
+ WorkerSlot ws = null;
+ // first scheduling
+ if (this.refNode == null) {
+ String clus = this.getBestClustering();
+ ws = this.getBestWorker(exec, td, clus, scheduleAssignmentMap);
+ } else {
+ ws = this.getBestWorker(exec, td, scheduleAssignmentMap);
+ }
+ if(ws != null) {
+ this.refNode = this.idToNode(ws.getNodeId());
+ }
+ LOG.debug("reference node for the resource aware scheduler is: {}",
this.refNode);
+ return ws;
+ }
+
+ private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails
td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
+ return this.getBestWorker(exec, td, null, scheduleAssignmentMap);
+ }
+
+ private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails
td, String clusterId, Map<WorkerSlot, Collection<ExecutorDetails>>
scheduleAssignmentMap) {
+ double taskMem = td.getTotalMemReqTask(exec);
+ double taskCPU = td.getTotalCpuReqTask(exec);
+ List<RAS_Node> nodes;
+ if(clusterId != null) {
+ nodes = this.getAvailableNodesFromCluster(clusterId);
+
+ } else {
+ nodes = this.getAvailableNodes();
+ }
+ TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<Double,
RAS_Node>();
+ for (RAS_Node n : nodes) {
+ if(n.getFreeSlots().size()>0) {
+ if (n.getAvailableMemoryResources() >= taskMem
+ && n.getAvailableCpuResources() >= taskCPU) {
+ double a = Math.pow((taskCPU -
n.getAvailableCpuResources())
+ * this.CPU_WEIGHT, 2);
+ double b = Math.pow((taskMem -
n.getAvailableMemoryResources())
+ * this.MEM_WEIGHT, 2);
+ double c = 0.0;
+ if(this.refNode != null) {
+ c = Math.pow(this.distToNode(this.refNode, n)
+ * this.NETWORK_WEIGHT, 2);
+ }
+ double distance = Math.sqrt(a + b + c);
+ nodeRankMap.put(distance, n);
+ }
+ }
+ }
+
+ for(Map.Entry<Double, RAS_Node> entry : nodeRankMap.entrySet()) {
+ RAS_Node n = entry.getValue();
+ for(WorkerSlot ws : n.getFreeSlots()) {
+ if(checkWorkerConstraints(exec, ws, td,
scheduleAssignmentMap)) {
+ return ws;
+ }
+ }
+ }
+ return null;
+ }
+
+ private String getBestClustering() {
+ String bestCluster = null;
+ Double mostRes = 0.0;
+ for (Entry<String, List<String>> cluster : _clusterInfo
+ .entrySet()) {
+ Double clusterTotalRes =
this.getTotalClusterRes(cluster.getValue());
+ if (clusterTotalRes > mostRes) {
+ mostRes = clusterTotalRes;
+ bestCluster = cluster.getKey();
+ }
+ }
+ return bestCluster;
+ }
+
+ private Double getTotalClusterRes(List<String> cluster) {
+ Double res = 0.0;
+ for (String node : cluster) {
+ res += _availNodes.get(this.NodeHostnameToId(node))
+ .getAvailableMemoryResources()
+ + _availNodes.get(this.NodeHostnameToId(node))
+ .getAvailableCpuResources();
+ }
+ return res;
+ }
+
+ private Double distToNode(RAS_Node src, RAS_Node dest) {
+ if (src.getId().equals(dest.getId())==true) {
+ return 1.0;
+ }else if (this.NodeToCluster(src) == this.NodeToCluster(dest)) {
+ return 2.0;
+ } else {
+ return 3.0;
+ }
+ }
+
+ private String NodeToCluster(RAS_Node node) {
+ for (Entry<String, List<String>> entry : _clusterInfo
+ .entrySet()) {
+ if (entry.getValue().contains(node.getHostname())) {
+ return entry.getKey();
+ }
+ }
+ LOG.error("Node: {} not found in any clusters",
node.getHostname());
+ return null;
+ }
+
+ private List<RAS_Node> getAvailableNodes() {
+ LinkedList<RAS_Node> nodes = new LinkedList<RAS_Node>();
+ for (String clusterId : _clusterInfo.keySet()) {
+ nodes.addAll(this.getAvailableNodesFromCluster(clusterId));
+ }
+ return nodes;
+ }
+
+ private List<RAS_Node> getAvailableNodesFromCluster(String clus) {
+ List<RAS_Node> retList = new ArrayList<RAS_Node>();
+ for (String node_id : _clusterInfo.get(clus)) {
+ retList.add(_availNodes.get(this
+ .NodeHostnameToId(node_id)));
+ }
+ return retList;
+ }
+
+ private List<WorkerSlot> getAvailableWorkersFromCluster(String
clusterId) {
+ List<RAS_Node> nodes =
this.getAvailableNodesFromCluster(clusterId);
+ List<WorkerSlot> workers = new LinkedList<WorkerSlot>();
+ for(RAS_Node node : nodes) {
+ workers.addAll(node.getFreeSlots());
+ }
+ return workers;
+ }
+
+ private List<WorkerSlot> getAvailableWorker() {
+ List<WorkerSlot> workers = new LinkedList<WorkerSlot>();
+ for (String clusterId : _clusterInfo.keySet()) {
+ workers.addAll(this.getAvailableWorkersFromCluster(clusterId));
+ }
+ return workers;
+ }
+
+ /**
+ * In case in the future RAS can only use a subset of nodes
+ */
+ private Map<String, RAS_Node> getAvailNodes() {
+ return _nodes;
+ }
+
+ /**
+ * Breadth first traversal of the topology DAG
+ * @param topologies
+ * @param td
+ * @param spouts
+ * @return A partial ordering of components
+ */
+ private Queue<Component> bfs(Topologies topologies, TopologyDetails
td, List<Component> spouts) {
+ // Since queue is a interface
+ Queue<Component> ordered__Component_list = new
LinkedList<Component>();
+ HashMap<String, Component> visited = new HashMap<String,
Component>();
+
+ /* start from each spout that is not visited, each does a
breadth-first traverse */
+ for (Component spout : spouts) {
+ if (!visited.containsKey(spout.id)) {
+ Queue<Component> queue = new LinkedList<Component>();
+ queue.offer(spout);
+ while (!queue.isEmpty()) {
+ Component comp = queue.poll();
+ visited.put(comp.id, comp);
+ ordered__Component_list.add(comp);
+ List<String> neighbors = new ArrayList<String>();
+ neighbors.addAll(comp.children);
+ neighbors.addAll(comp.parents);
+ for (String nbID : neighbors) {
+ if (!visited.containsKey(nbID)) {
+ Component child =
topologies.getAllComponents().get(td.getId()).get(nbID);
+ queue.offer(child);
+ }
+ }
+ }
+ }
+ }
+ return ordered__Component_list;
+ }
+
+ private List<Component> getSpouts(Topologies topologies,
TopologyDetails td) {
+ List<Component> spouts = new ArrayList<Component>();
+ for (Component c : topologies.getAllComponents().get(td.getId())
+ .values()) {
+ if (c.type == Component.ComponentType.SPOUT) {
+ spouts.add(c);
+ }
+ }
+ return spouts;
+ }
+
+ private Integer getLongestPriorityListSize(Map<Integer,
List<ExecutorDetails>> priorityToExecutorMap) {
+ Integer mostNum = 0;
+ for (List<ExecutorDetails> execs : priorityToExecutorMap.values())
{
+ Integer numExecs = execs.size();
+ if (mostNum < numExecs) {
+ mostNum = numExecs;
+ }
+ }
+ return mostNum;
+ }
+
+ /**
+ * Get the remaining amount memory that can be assigned to a worker
given the set worker max heap size
+ * @param ws
+ * @param td
+ * @param scheduleAssignmentMap
+ * @return The remaining amount of memory
+ */
+ private Double getWorkerScheduledMemoryAvailable(WorkerSlot ws,
TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>>
scheduleAssignmentMap) {
+ Double memScheduleUsed = this.getWorkerScheduledMemoryUse(ws, td,
scheduleAssignmentMap);
+ return td.getTopologyWorkerMaxHeapSize() - memScheduleUsed;
+ }
+
+ /**
+ * Get the amount of memory already assigned to a worker
+ * @param ws
+ * @param td
+ * @param scheduleAssignmentMap
+ * @return the amount of memory
+ */
+ private Double getWorkerScheduledMemoryUse(WorkerSlot ws,
TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>>
scheduleAssignmentMap) {
+ Double totalMem = 0.0;
+ Collection<ExecutorDetails> execs = scheduleAssignmentMap.get(ws);
+ if(execs != null) {
+ for(ExecutorDetails exec : execs) {
+ totalMem += td.getTotalMemReqTask(exec);
+ }
+ }
+ return totalMem;
+ }
+
+ /**
+ * Checks whether we can schedule an Executor exec on the worker slot
ws
+ * @param exec
+ * @param ws
+ * @param td
+ * @param scheduleAssignmentMap
+ * @return a boolean: True denoting the exec can be scheduled on ws
and false if it cannot
+ */
+ private boolean checkWorkerConstraints(ExecutorDetails exec,
WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>>
scheduleAssignmentMap) {
--- End diff --
will add comment
> Resource Aware Scheduling
> -------------------------
>
> Key: STORM-893
> URL: https://issues.apache.org/jira/browse/STORM-893
> Project: Apache Storm
> Issue Type: Umbrella
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Boyang Jerry Peng
> Attachments: resource_aware_scheduler_api.pdf
>
>
> At Yahoo we have been working on resource aware scheduling in storm, based
> off of some work done in academia. This rollup ticket is to track the
> complete project. With several sub tasks. Some that are already done and
> need to be pushed back, and others that we have not started on yet.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)