http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/Node.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/Node.java index 389faba..89fe462 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/Node.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/Node.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.multitenant; @@ -23,318 +17,313 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.Map.Entry; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import java.util.Set; import org.apache.storm.scheduler.Cluster; import org.apache.storm.scheduler.ExecutorDetails; import org.apache.storm.scheduler.SchedulerAssignment; import org.apache.storm.scheduler.SupervisorDetails; import org.apache.storm.scheduler.WorkerSlot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Represents a single node in the cluster. */ public class Node { - private static final Logger LOG = LoggerFactory.getLogger(Node.class); - private Map<String, Set<WorkerSlot>> _topIdToUsedSlots = new HashMap<>(); - private Set<WorkerSlot> _freeSlots = new HashSet<>(); - private final String _nodeId; - private boolean _isAlive; - - public Node(String nodeId, Set<Integer> allPorts, boolean isAlive) { - _nodeId = nodeId; - _isAlive = isAlive; - if (_isAlive && allPorts != null) { - for (int port: allPorts) { - _freeSlots.add(new WorkerSlot(_nodeId, port)); - } - } - } + /** + * Used to sort a list of nodes so the node with the most free slots comes + * first. + */ + public static final Comparator<Node> FREE_NODE_COMPARATOR_DEC = new Comparator<Node>() { + @Override + public int compare(Node o1, Node o2) { + return o1.totalSlotsUsed() - o2.totalSlotsUsed(); + } + }; + private static final Logger LOG = LoggerFactory.getLogger(Node.class); + private final String _nodeId; + private Map<String, Set<WorkerSlot>> _topIdToUsedSlots = new HashMap<>(); + private Set<WorkerSlot> _freeSlots = new HashSet<>(); + private boolean _isAlive; - public String getId() { - return _nodeId; - } - - public boolean isAlive() { - return _isAlive; - } - - /** - * @return a collection of the topology ids currently running on this node - */ - public Collection<String> getRunningTopologies() { - return _topIdToUsedSlots.keySet(); - } - - public boolean isTotallyFree() { - return _topIdToUsedSlots.isEmpty(); - } - - public int totalSlotsFree() { - return _freeSlots.size(); - } - - public int totalSlotsUsed() { - int total = 0; - for (Set<WorkerSlot> slots: _topIdToUsedSlots.values()) { - total += slots.size(); - } - return total; - } - - public int totalSlots() { - return totalSlotsFree() + totalSlotsUsed(); - } - - public int totalSlotsUsed(String topId) { - int total = 0; - Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId); - if (slots != null) { - total = slots.size(); + public Node(String nodeId, Set<Integer> allPorts, boolean isAlive) { + _nodeId = nodeId; + _isAlive = isAlive; + if (_isAlive && allPorts != null) { + for (int port : allPorts) { + _freeSlots.add(new WorkerSlot(_nodeId, port)); + } + } } - return total; - } - private void validateSlot(WorkerSlot ws) { - if (!_nodeId.equals(ws.getNodeId())) { - throw new IllegalArgumentException( - "Trying to add a slot to the wrong node " + ws + - " is not a part of " + _nodeId); + public static int countSlotsUsed(String topId, Collection<Node> nodes) { + int total = 0; + for (Node n : nodes) { + total += n.totalSlotsUsed(topId); + } + return total; } - } - - private void addOrphanedSlot(WorkerSlot ws) { - if (_isAlive) { - throw new IllegalArgumentException("Orphaned Slots " + - "only are allowed on dead nodes."); + + public static int countSlotsUsed(Collection<Node> nodes) { + int total = 0; + for (Node n : nodes) { + total += n.totalSlotsUsed(); + } + return total; } - validateSlot(ws); - if (_freeSlots.contains(ws)) { - return; + + public static int countFreeSlotsAlive(Collection<Node> nodes) { + int total = 0; + for (Node n : nodes) { + if (n.isAlive()) { + total += n.totalSlotsFree(); + } + } + return total; } - for (Set<WorkerSlot> used: _topIdToUsedSlots.values()) { - if (used.contains(ws)) { - return; - } + + public static int countTotalSlotsAlive(Collection<Node> nodes) { + int total = 0; + for (Node n : nodes) { + if (n.isAlive()) { + total += n.totalSlots(); + } + } + return total; } - _freeSlots.add(ws); - } - - boolean assignInternal(WorkerSlot ws, String topId, boolean dontThrow) { - validateSlot(ws); - if (!_freeSlots.remove(ws)) { - for (Entry<String, Set<WorkerSlot>> topologySetEntry : _topIdToUsedSlots.entrySet()) { - if (topologySetEntry.getValue().contains(ws)) { - if (dontThrow) { - LOG.warn("Worker slot [" + ws + "] can't be assigned to " + topId + - ". Its already assigned to " + topologySetEntry.getKey() + "."); - return true; - } - throw new IllegalStateException("Worker slot [" + ws + "] can't be assigned to " - + topId + ". Its already assigned to " + topologySetEntry.getKey() + "."); + + public static Map<String, Node> getAllNodesFrom(Cluster cluster) { + Map<String, Node> nodeIdToNode = new HashMap<>(); + for (SupervisorDetails sup : cluster.getSupervisors().values()) { + //Node ID and supervisor ID are the same. + String id = sup.getId(); + boolean isAlive = !cluster.isBlackListed(id); + LOG.debug("Found a {} Node {} {}", + isAlive ? "living" : "dead", id, sup.getAllPorts()); + nodeIdToNode.put(id, new Node(id, sup.getAllPorts(), isAlive)); } - } - LOG.warn("Adding Worker slot [" + ws + "] that was not reported in the supervisor heartbeats," + - " but the worker is already running for topology " + topId + "."); + + for (Entry<String, SchedulerAssignment> entry : cluster.getAssignments().entrySet()) { + String topId = entry.getValue().getTopologyId(); + for (WorkerSlot ws : entry.getValue().getSlots()) { + String id = ws.getNodeId(); + Node node = nodeIdToNode.get(id); + if (node == null) { + LOG.debug("Found an assigned slot on a dead supervisor {}", ws); + node = new Node(id, null, false); + nodeIdToNode.put(id, node); + } + if (!node.isAlive()) { + //The supervisor on the node down so add an orphaned slot to hold the unsupervised worker + node.addOrphanedSlot(ws); + } + if (node.assignInternal(ws, topId, true)) { + LOG.warn("Bad scheduling state for topology [" + topId + "], the slot " + + ws + " assigned to multiple workers, un-assigning everything..."); + node.free(ws, cluster, true); + } + } + } + + return nodeIdToNode; } - Set<WorkerSlot> usedSlots = _topIdToUsedSlots.get(topId); - if (usedSlots == null) { - usedSlots = new HashSet<>(); - _topIdToUsedSlots.put(topId, usedSlots); + + public String getId() { + return _nodeId; } - usedSlots.add(ws); - return false; - } - - /** - * Free all slots on this node. This will update the Cluster too. - * @param cluster the cluster to be updated - */ - public void freeAllSlots(Cluster cluster) { - if (!_isAlive) { - LOG.warn("Freeing all slots on a dead node {} ",_nodeId); - } - for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) { - cluster.freeSlots(entry.getValue()); - if (_isAlive) { - _freeSlots.addAll(entry.getValue()); - } + + public boolean isAlive() { + return _isAlive; } - _topIdToUsedSlots = new HashMap<>(); - } - - /** - * Frees a single slot in this node - * @param ws the slot to free - * @param cluster the cluster to update - */ - public void free(WorkerSlot ws, Cluster cluster, boolean forceFree) { - if (_freeSlots.contains(ws)) return; - boolean wasFound = false; - for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) { - Set<WorkerSlot> slots = entry.getValue(); - if (slots.remove(ws)) { - cluster.freeSlot(ws); - if (_isAlive) { - _freeSlots.add(ws); - } - wasFound = true; - } + + /** + * @return a collection of the topology ids currently running on this node + */ + public Collection<String> getRunningTopologies() { + return _topIdToUsedSlots.keySet(); } - if(!wasFound) - { - if(forceFree) - { - LOG.info("Forcefully freeing the " + ws); - cluster.freeSlot(ws); - _freeSlots.add(ws); - } else { - throw new IllegalArgumentException("Tried to free a slot that was not" + - " part of this node " + _nodeId); - } + + public boolean isTotallyFree() { + return _topIdToUsedSlots.isEmpty(); } - } - - /** - * Frees all the slots for a topology. - * @param topId the topology to free slots for - * @param cluster the cluster to update - */ - public void freeTopology(String topId, Cluster cluster) { - Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId); - if (slots == null || slots.isEmpty()) return; - for (WorkerSlot ws : slots) { - cluster.freeSlot(ws); - if (_isAlive) { - _freeSlots.add(ws); - } + + public int totalSlotsFree() { + return _freeSlots.size(); } - _topIdToUsedSlots.remove(topId); - } - - /** - * Assign a free slot on the node to the following topology and executors. - * This will update the cluster too. - * @param topId the topology to assign a free slot to. - * @param executors the executors to run in that slot. - * @param cluster the cluster to be updated - */ - public void assign(String topId, Collection<ExecutorDetails> executors, - Cluster cluster) { - if (!_isAlive) { - throw new IllegalStateException("Trying to adding to a dead node " + _nodeId); + + public int totalSlotsUsed() { + int total = 0; + for (Set<WorkerSlot> slots : _topIdToUsedSlots.values()) { + total += slots.size(); + } + return total; + } + + public int totalSlots() { + return totalSlotsFree() + totalSlotsUsed(); } - if (_freeSlots.isEmpty()) { - throw new IllegalStateException("Trying to assign to a full node " + _nodeId); + + public int totalSlotsUsed(String topId) { + int total = 0; + Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId); + if (slots != null) { + total = slots.size(); + } + return total; } - if (executors.size() == 0) { - LOG.warn("Trying to assign nothing from " + topId + " to " + _nodeId + " (Ignored)"); - } else { - WorkerSlot slot = _freeSlots.iterator().next(); - cluster.assign(slot, topId, executors); - assignInternal(slot, topId, false); + + private void validateSlot(WorkerSlot ws) { + if (!_nodeId.equals(ws.getNodeId())) { + throw new IllegalArgumentException( + "Trying to add a slot to the wrong node " + ws + + " is not a part of " + _nodeId); + } } - } - - @Override - public boolean equals(Object other) { - return other instanceof Node && _nodeId.equals(((Node) other)._nodeId); - } - - @Override - public int hashCode() { - return _nodeId.hashCode(); - } - - @Override - public String toString() { - return "Node: " + _nodeId; - } - public static int countSlotsUsed(String topId, Collection<Node> nodes) { - int total = 0; - for (Node n: nodes) { - total += n.totalSlotsUsed(topId); + private void addOrphanedSlot(WorkerSlot ws) { + if (_isAlive) { + throw new IllegalArgumentException("Orphaned Slots " + + "only are allowed on dead nodes."); + } + validateSlot(ws); + if (_freeSlots.contains(ws)) { + return; + } + for (Set<WorkerSlot> used : _topIdToUsedSlots.values()) { + if (used.contains(ws)) { + return; + } + } + _freeSlots.add(ws); } - return total; - } - - public static int countSlotsUsed(Collection<Node> nodes) { - int total = 0; - for (Node n: nodes) { - total += n.totalSlotsUsed(); + + boolean assignInternal(WorkerSlot ws, String topId, boolean dontThrow) { + validateSlot(ws); + if (!_freeSlots.remove(ws)) { + for (Entry<String, Set<WorkerSlot>> topologySetEntry : _topIdToUsedSlots.entrySet()) { + if (topologySetEntry.getValue().contains(ws)) { + if (dontThrow) { + LOG.warn("Worker slot [" + ws + "] can't be assigned to " + topId + + ". Its already assigned to " + topologySetEntry.getKey() + "."); + return true; + } + throw new IllegalStateException("Worker slot [" + ws + "] can't be assigned to " + + topId + ". Its already assigned to " + topologySetEntry.getKey() + "."); + } + } + LOG.warn("Adding Worker slot [" + ws + "] that was not reported in the supervisor heartbeats," + + " but the worker is already running for topology " + topId + "."); + } + Set<WorkerSlot> usedSlots = _topIdToUsedSlots.get(topId); + if (usedSlots == null) { + usedSlots = new HashSet<>(); + _topIdToUsedSlots.put(topId, usedSlots); + } + usedSlots.add(ws); + return false; } - return total; - } - - public static int countFreeSlotsAlive(Collection<Node> nodes) { - int total = 0; - for (Node n: nodes) { - if (n.isAlive()) { - total += n.totalSlotsFree(); - } + + /** + * Free all slots on this node. This will update the Cluster too. + * @param cluster the cluster to be updated + */ + public void freeAllSlots(Cluster cluster) { + if (!_isAlive) { + LOG.warn("Freeing all slots on a dead node {} ", _nodeId); + } + for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) { + cluster.freeSlots(entry.getValue()); + if (_isAlive) { + _freeSlots.addAll(entry.getValue()); + } + } + _topIdToUsedSlots = new HashMap<>(); } - return total; - } - - public static int countTotalSlotsAlive(Collection<Node> nodes) { - int total = 0; - for (Node n: nodes) { - if (n.isAlive()) { - total += n.totalSlots(); - } + + /** + * Frees a single slot in this node + * @param ws the slot to free + * @param cluster the cluster to update + */ + public void free(WorkerSlot ws, Cluster cluster, boolean forceFree) { + if (_freeSlots.contains(ws)) return; + boolean wasFound = false; + for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) { + Set<WorkerSlot> slots = entry.getValue(); + if (slots.remove(ws)) { + cluster.freeSlot(ws); + if (_isAlive) { + _freeSlots.add(ws); + } + wasFound = true; + } + } + if (!wasFound) { + if (forceFree) { + LOG.info("Forcefully freeing the " + ws); + cluster.freeSlot(ws); + _freeSlots.add(ws); + } else { + throw new IllegalArgumentException("Tried to free a slot that was not" + + " part of this node " + _nodeId); + } + } } - return total; - } - - public static Map<String, Node> getAllNodesFrom(Cluster cluster) { - Map<String, Node> nodeIdToNode = new HashMap<>(); - for (SupervisorDetails sup : cluster.getSupervisors().values()) { - //Node ID and supervisor ID are the same. - String id = sup.getId(); - boolean isAlive = !cluster.isBlackListed(id); - LOG.debug("Found a {} Node {} {}", - isAlive? "living":"dead", id, sup.getAllPorts()); - nodeIdToNode.put(id, new Node(id, sup.getAllPorts(), isAlive)); + + /** + * Frees all the slots for a topology. + * @param topId the topology to free slots for + * @param cluster the cluster to update + */ + public void freeTopology(String topId, Cluster cluster) { + Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId); + if (slots == null || slots.isEmpty()) return; + for (WorkerSlot ws : slots) { + cluster.freeSlot(ws); + if (_isAlive) { + _freeSlots.add(ws); + } + } + _topIdToUsedSlots.remove(topId); } - - for (Entry<String, SchedulerAssignment> entry : cluster.getAssignments().entrySet()) { - String topId = entry.getValue().getTopologyId(); - for (WorkerSlot ws: entry.getValue().getSlots()) { - String id = ws.getNodeId(); - Node node = nodeIdToNode.get(id); - if (node == null) { - LOG.debug("Found an assigned slot on a dead supervisor {}", ws); - node = new Node(id, null, false); - nodeIdToNode.put(id, node); + + /** + * Assign a free slot on the node to the following topology and executors. + * This will update the cluster too. + * @param topId the topology to assign a free slot to. + * @param executors the executors to run in that slot. + * @param cluster the cluster to be updated + */ + public void assign(String topId, Collection<ExecutorDetails> executors, + Cluster cluster) { + if (!_isAlive) { + throw new IllegalStateException("Trying to adding to a dead node " + _nodeId); } - if (!node.isAlive()) { - //The supervisor on the node down so add an orphaned slot to hold the unsupervised worker - node.addOrphanedSlot(ws); + if (_freeSlots.isEmpty()) { + throw new IllegalStateException("Trying to assign to a full node " + _nodeId); } - if (node.assignInternal(ws, topId, true)) { - LOG.warn("Bad scheduling state for topology [" + topId+ "], the slot " + - ws + " assigned to multiple workers, un-assigning everything..."); - node.free(ws, cluster, true); + if (executors.size() == 0) { + LOG.warn("Trying to assign nothing from " + topId + " to " + _nodeId + " (Ignored)"); + } else { + WorkerSlot slot = _freeSlots.iterator().next(); + cluster.assign(slot, topId, executors); + assignInternal(slot, topId, false); } - } } - - return nodeIdToNode; - } - - /** - * Used to sort a list of nodes so the node with the most free slots comes - * first. - */ - public static final Comparator<Node> FREE_NODE_COMPARATOR_DEC = new Comparator<Node>() { + + @Override + public boolean equals(Object other) { + return other instanceof Node && _nodeId.equals(((Node) other)._nodeId); + } + + @Override + public int hashCode() { + return _nodeId.hashCode(); + } + @Override - public int compare(Node o1, Node o2) { - return o1.totalSlotsUsed() - o2.totalSlotsUsed(); + public String toString() { + return "Node: " + _nodeId; } - }; }
http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/NodePool.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/NodePool.java b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/NodePool.java index e609028..21ffbf5 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/NodePool.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/NodePool.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.multitenant; @@ -27,270 +21,268 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.storm.Config; import org.apache.storm.scheduler.Cluster; import org.apache.storm.scheduler.ExecutorDetails; import org.apache.storm.scheduler.SchedulerAssignment; import org.apache.storm.scheduler.TopologyDetails; import org.apache.storm.scheduler.WorkerSlot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A pool of nodes that can be used to run topologies. */ public abstract class NodePool { - protected Cluster _cluster; - protected Map<String, Node> _nodeIdToNode; - - public static class NodeAndSlotCounts { - public final int _nodes; - public final int _slots; - - public NodeAndSlotCounts(int nodes, int slots) { - _nodes = nodes; - _slots = slots; + private static final Logger LOG = LoggerFactory.getLogger(NodePool.class); + protected Cluster _cluster; + protected Map<String, Node> _nodeIdToNode; + + public static int slotsAvailable(NodePool[] pools) { + int slotsAvailable = 0; + for (NodePool pool : pools) { + slotsAvailable += pool.slotsAvailable(); + } + return slotsAvailable; } - } - /** - * Place executors into slots in a round robin way, taking into account - * component spreading among different hosts. - */ - public static class RoundRobinSlotScheduler { - private Map<String,Set<String>> _nodeToComps; - private HashMap<String, List<ExecutorDetails>> _spreadToSchedule; - private LinkedList<Set<ExecutorDetails>> _slots; - private Set<ExecutorDetails> _lastSlot; - private Cluster _cluster; - private String _topId; - - /** - * Create a new scheduler for a given topology - * @param td the topology to schedule - * @param slotsToUse the number of slots to use for the executors left to - * schedule. - * @param cluster the cluster to schedule this on. - */ - public RoundRobinSlotScheduler(TopologyDetails td, int slotsToUse, - Cluster cluster) { - _topId = td.getId(); - _cluster = cluster; - - Map<ExecutorDetails, String> execToComp = td.getExecutorToComponent(); - SchedulerAssignment assignment = _cluster.getAssignmentById(_topId); - _nodeToComps = new HashMap<>(); + public static int nodesAvailable(NodePool[] pools) { + int nodesAvailable = 0; + for (NodePool pool : pools) { + nodesAvailable += pool.nodesAvailable(); + } + return nodesAvailable; + } - if (assignment != null) { - Map<ExecutorDetails, WorkerSlot> execToSlot = assignment.getExecutorToSlot(); - - for (Entry<ExecutorDetails, WorkerSlot> entry: execToSlot.entrySet()) { - String nodeId = entry.getValue().getNodeId(); - Set<String> comps = _nodeToComps.get(nodeId); - if (comps == null) { - comps = new HashSet<>(); - _nodeToComps.put(nodeId, comps); - } - comps.add(execToComp.get(entry.getKey())); + public static Collection<Node> takeNodesBySlot(int slotsNeeded, NodePool[] pools) { + LOG.debug("Trying to grab {} free slots from {}", slotsNeeded, pools); + HashSet<Node> ret = new HashSet<>(); + for (NodePool pool : pools) { + Collection<Node> got = pool.takeNodesBySlots(slotsNeeded); + ret.addAll(got); + slotsNeeded -= Node.countFreeSlotsAlive(got); + LOG.debug("Got {} nodes so far need {} more slots", ret.size(), slotsNeeded); + if (slotsNeeded <= 0) { + break; + } } - } - - _spreadToSchedule = new HashMap<>(); - List<String> spreadComps = (List<String>)td.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS); - if (spreadComps != null) { - for (String comp: spreadComps) { - _spreadToSchedule.put(comp, new ArrayList<ExecutorDetails>()); + return ret; + } + + public static Collection<Node> takeNodes(int nodesNeeded, NodePool[] pools) { + LOG.debug("Trying to grab {} free nodes from {}", nodesNeeded, pools); + HashSet<Node> ret = new HashSet<>(); + for (NodePool pool : pools) { + Collection<Node> got = pool.takeNodes(nodesNeeded); + ret.addAll(got); + nodesNeeded -= got.size(); + LOG.debug("Got {} nodes so far need {} more nodes", ret.size(), nodesNeeded); + if (nodesNeeded <= 0) { + break; + } } - } - - _slots = new LinkedList<>(); - for (int i = 0; i < slotsToUse; i++) { - _slots.add(new HashSet<ExecutorDetails>()); - } + return ret; + } - int at = 0; - for (Entry<String, List<ExecutorDetails>> entry: _cluster.getNeedsSchedulingComponentToExecutors(td).entrySet()) { - LOG.debug("Scheduling for {}", entry.getKey()); - if (_spreadToSchedule.containsKey(entry.getKey())) { - LOG.debug("Saving {} for spread...",entry.getKey()); - _spreadToSchedule.get(entry.getKey()).addAll(entry.getValue()); - } else { - for (ExecutorDetails ed: entry.getValue()) { - LOG.debug("Assigning {} {} to slot {}", entry.getKey(), ed, at); - _slots.get(at).add(ed); - at++; - if (at >= _slots.size()) { - at = 0; + public static int getNodeCountIfSlotsWereTaken(int slots, NodePool[] pools) { + LOG.debug("How many nodes to get {} slots from {}", slots, pools); + int total = 0; + for (NodePool pool : pools) { + NodeAndSlotCounts ns = pool.getNodeAndSlotCountIfSlotsWereTaken(slots); + total += ns._nodes; + slots -= ns._slots; + LOG.debug("Found {} nodes so far {} more slots needed", total, slots); + if (slots <= 0) { + break; } - } } - } - _lastSlot = _slots.get(_slots.size() - 1); + return total; } - + /** - * Assign a slot to the given node. - * @param n the node to assign a slot to. - * @return true if there are more slots to assign else false. + * Initialize the pool. + * @param cluster the cluster + * @param nodeIdToNode the mapping of node id to nodes */ - public boolean assignSlotTo(Node n) { - if (_slots.isEmpty()) { - return false; - } - Set<ExecutorDetails> slot = _slots.pop(); - if (slot == _lastSlot) { - //The last slot fill it up - for (Entry<String, List<ExecutorDetails>> entry: _spreadToSchedule.entrySet()) { - if (entry.getValue().size() > 0) { - slot.addAll(entry.getValue()); - } - } - } else { - String nodeId = n.getId(); - Set<String> nodeComps = _nodeToComps.get(nodeId); - if (nodeComps == null) { - nodeComps = new HashSet<>(); - _nodeToComps.put(nodeId, nodeComps); + public void init(Cluster cluster, Map<String, Node> nodeIdToNode) { + _cluster = cluster; + _nodeIdToNode = nodeIdToNode; + } + + /** + * Add a topology to the pool + * @param td the topology to add. + */ + public abstract void addTopology(TopologyDetails td); + + /** + * Check if this topology can be added to this pool + * @param td the topology + * @return true if it can else false + */ + public abstract boolean canAdd(TopologyDetails td); + + /** + * @return the number of nodes that are available to be taken + */ + public abstract int slotsAvailable(); + + /** + * Take nodes from this pool that can fulfill possibly up to the + * slotsNeeded + * @param slotsNeeded the number of slots that are needed. + * @return a Collection of nodes with the removed nodes in it. + * This may be empty, but should not be null. + */ + public abstract Collection<Node> takeNodesBySlots(int slotsNeeded); + + /** + * Get the number of nodes and slots this would provide to get the slots needed + * @param slots the number of slots needed + * @return the number of nodes and slots that would be returned. + */ + public abstract NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slots); + + /** + * @return the number of nodes that are available to be taken + */ + public abstract int nodesAvailable(); + + /** + * Take up to nodesNeeded from this pool + * @param nodesNeeded the number of nodes that are needed. + * @return a Collection of nodes with the removed nodes in it. + * This may be empty, but should not be null. + */ + public abstract Collection<Node> takeNodes(int nodesNeeded); + + /** + * Reschedule any topologies as needed. + * @param lesserPools pools that may be used to steal nodes from. + */ + public abstract void scheduleAsNeeded(NodePool... lesserPools); + + public static class NodeAndSlotCounts { + public final int _nodes; + public final int _slots; + + public NodeAndSlotCounts(int nodes, int slots) { + _nodes = nodes; + _slots = slots; } - for (Entry<String, List<ExecutorDetails>> entry: _spreadToSchedule.entrySet()) { - if (entry.getValue().size() > 0) { - String comp = entry.getKey(); - if (!nodeComps.contains(comp)) { - nodeComps.add(comp); - slot.add(entry.getValue().remove(0)); + } + + /** + * Place executors into slots in a round robin way, taking into account + * component spreading among different hosts. + */ + public static class RoundRobinSlotScheduler { + private Map<String, Set<String>> _nodeToComps; + private HashMap<String, List<ExecutorDetails>> _spreadToSchedule; + private LinkedList<Set<ExecutorDetails>> _slots; + private Set<ExecutorDetails> _lastSlot; + private Cluster _cluster; + private String _topId; + + /** + * Create a new scheduler for a given topology + * @param td the topology to schedule + * @param slotsToUse the number of slots to use for the executors left to + * schedule. + * @param cluster the cluster to schedule this on. + */ + public RoundRobinSlotScheduler(TopologyDetails td, int slotsToUse, + Cluster cluster) { + _topId = td.getId(); + _cluster = cluster; + + Map<ExecutorDetails, String> execToComp = td.getExecutorToComponent(); + SchedulerAssignment assignment = _cluster.getAssignmentById(_topId); + _nodeToComps = new HashMap<>(); + + if (assignment != null) { + Map<ExecutorDetails, WorkerSlot> execToSlot = assignment.getExecutorToSlot(); + + for (Entry<ExecutorDetails, WorkerSlot> entry : execToSlot.entrySet()) { + String nodeId = entry.getValue().getNodeId(); + Set<String> comps = _nodeToComps.get(nodeId); + if (comps == null) { + comps = new HashSet<>(); + _nodeToComps.put(nodeId, comps); + } + comps.add(execToComp.get(entry.getKey())); + } + } + + _spreadToSchedule = new HashMap<>(); + List<String> spreadComps = (List<String>) td.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS); + if (spreadComps != null) { + for (String comp : spreadComps) { + _spreadToSchedule.put(comp, new ArrayList<ExecutorDetails>()); + } + } + + _slots = new LinkedList<>(); + for (int i = 0; i < slotsToUse; i++) { + _slots.add(new HashSet<ExecutorDetails>()); + } + + int at = 0; + for (Entry<String, List<ExecutorDetails>> entry : _cluster.getNeedsSchedulingComponentToExecutors(td).entrySet()) { + LOG.debug("Scheduling for {}", entry.getKey()); + if (_spreadToSchedule.containsKey(entry.getKey())) { + LOG.debug("Saving {} for spread...", entry.getKey()); + _spreadToSchedule.get(entry.getKey()).addAll(entry.getValue()); + } else { + for (ExecutorDetails ed : entry.getValue()) { + LOG.debug("Assigning {} {} to slot {}", entry.getKey(), ed, at); + _slots.get(at).add(ed); + at++; + if (at >= _slots.size()) { + at = 0; + } + } + } } - } + _lastSlot = _slots.get(_slots.size() - 1); } - } - n.assign(_topId, slot, _cluster); - return !_slots.isEmpty(); - } - } - - private static final Logger LOG = LoggerFactory.getLogger(NodePool.class); - /** - * Initialize the pool. - * @param cluster the cluster - * @param nodeIdToNode the mapping of node id to nodes - */ - public void init(Cluster cluster, Map<String, Node> nodeIdToNode) { - _cluster = cluster; - _nodeIdToNode = nodeIdToNode; - } - - /** - * Add a topology to the pool - * @param td the topology to add. - */ - public abstract void addTopology(TopologyDetails td); - - /** - * Check if this topology can be added to this pool - * @param td the topology - * @return true if it can else false - */ - public abstract boolean canAdd(TopologyDetails td); - - /** - * @return the number of nodes that are available to be taken - */ - public abstract int slotsAvailable(); - - /** - * Take nodes from this pool that can fulfill possibly up to the - * slotsNeeded - * @param slotsNeeded the number of slots that are needed. - * @return a Collection of nodes with the removed nodes in it. - * This may be empty, but should not be null. - */ - public abstract Collection<Node> takeNodesBySlots(int slotsNeeded); - /** - * Get the number of nodes and slots this would provide to get the slots needed - * @param slots the number of slots needed - * @return the number of nodes and slots that would be returned. - */ - public abstract NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slots); - - /** - * @return the number of nodes that are available to be taken - */ - public abstract int nodesAvailable(); - - /** - * Take up to nodesNeeded from this pool - * @param nodesNeeded the number of nodes that are needed. - * @return a Collection of nodes with the removed nodes in it. - * This may be empty, but should not be null. - */ - public abstract Collection<Node> takeNodes(int nodesNeeded); - - /** - * Reschedule any topologies as needed. - * @param lesserPools pools that may be used to steal nodes from. - */ - public abstract void scheduleAsNeeded(NodePool ... lesserPools); - - public static int slotsAvailable(NodePool[] pools) { - int slotsAvailable = 0; - for (NodePool pool: pools) { - slotsAvailable += pool.slotsAvailable(); - } - return slotsAvailable; - } - - public static int nodesAvailable(NodePool[] pools) { - int nodesAvailable = 0; - for (NodePool pool: pools) { - nodesAvailable += pool.nodesAvailable(); - } - return nodesAvailable; - } - - public static Collection<Node> takeNodesBySlot(int slotsNeeded,NodePool[] pools) { - LOG.debug("Trying to grab {} free slots from {}",slotsNeeded, pools); - HashSet<Node> ret = new HashSet<>(); - for (NodePool pool: pools) { - Collection<Node> got = pool.takeNodesBySlots(slotsNeeded); - ret.addAll(got); - slotsNeeded -= Node.countFreeSlotsAlive(got); - LOG.debug("Got {} nodes so far need {} more slots",ret.size(),slotsNeeded); - if (slotsNeeded <= 0) { - break; - } - } - return ret; - } - - public static Collection<Node> takeNodes(int nodesNeeded,NodePool[] pools) { - LOG.debug("Trying to grab {} free nodes from {}",nodesNeeded, pools); - HashSet<Node> ret = new HashSet<>(); - for (NodePool pool: pools) { - Collection<Node> got = pool.takeNodes(nodesNeeded); - ret.addAll(got); - nodesNeeded -= got.size(); - LOG.debug("Got {} nodes so far need {} more nodes", ret.size(), nodesNeeded); - if (nodesNeeded <= 0) { - break; - } + /** + * Assign a slot to the given node. + * @param n the node to assign a slot to. + * @return true if there are more slots to assign else false. + */ + public boolean assignSlotTo(Node n) { + if (_slots.isEmpty()) { + return false; + } + Set<ExecutorDetails> slot = _slots.pop(); + if (slot == _lastSlot) { + //The last slot fill it up + for (Entry<String, List<ExecutorDetails>> entry : _spreadToSchedule.entrySet()) { + if (entry.getValue().size() > 0) { + slot.addAll(entry.getValue()); + } + } + } else { + String nodeId = n.getId(); + Set<String> nodeComps = _nodeToComps.get(nodeId); + if (nodeComps == null) { + nodeComps = new HashSet<>(); + _nodeToComps.put(nodeId, nodeComps); + } + for (Entry<String, List<ExecutorDetails>> entry : _spreadToSchedule.entrySet()) { + if (entry.getValue().size() > 0) { + String comp = entry.getKey(); + if (!nodeComps.contains(comp)) { + nodeComps.add(comp); + slot.add(entry.getValue().remove(0)); + } + } + } + } + n.assign(_topId, slot, _cluster); + return !_slots.isEmpty(); + } } - return ret; - } - - public static int getNodeCountIfSlotsWereTaken(int slots,NodePool[] pools) { - LOG.debug("How many nodes to get {} slots from {}",slots, pools); - int total = 0; - for (NodePool pool: pools) { - NodeAndSlotCounts ns = pool.getNodeAndSlotCountIfSlotsWereTaken(slots); - total += ns._nodes; - slots -= ns._slots; - LOG.debug("Found {} nodes so far {} more slots needed", total, slots); - if (slots <= 0) { - break; - } - } - return total; - } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java index 84a95a7..4f43c0a 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java @@ -34,24 +34,23 @@ import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Represents a single node in the cluster. */ +/** + * Represents a single node in the cluster. + */ public class RAS_Node { private static final Logger LOG = LoggerFactory.getLogger(RAS_Node.class); - + private final String nodeId; + private final Cluster cluster; + private final Set<WorkerSlot> originallyFreeSlots; //A map consisting of all workers on the node. //The key of the map is the worker id and the value is the corresponding workerslot object private Map<String, WorkerSlot> slots = new HashMap<>(); - // A map describing which topologies are using which slots on this node. The format of the map is the following: // {TopologyId -> {WorkerId -> {Executors}}} private Map<String, Map<String, Collection<ExecutorDetails>>> topIdToUsedSlots = new HashMap<>(); - - private final String nodeId; private String hostname; private boolean isAlive; private SupervisorDetails sup; - private final Cluster cluster; - private final Set<WorkerSlot> originallyFreeSlots; public RAS_Node( String nodeId, @@ -99,6 +98,26 @@ public class RAS_Node { } } + public static int countFreeSlotsAlive(Collection<RAS_Node> nodes) { + int total = 0; + for (RAS_Node n : nodes) { + if (n.isAlive()) { + total += n.totalSlotsFree(); + } + } + return total; + } + + public static int countTotalSlotsAlive(Collection<RAS_Node> nodes) { + int total = 0; + for (RAS_Node n : nodes) { + if (n.isAlive()) { + total += n.totalSlots(); + } + } + return total; + } + public String getId() { return nodeId; } @@ -167,7 +186,9 @@ public class RAS_Node { return isAlive; } - /** Get a collection of the topology ids currently running on this node. */ + /** + * Get a collection of the topology ids currently running on this node. + */ public Collection<String> getRunningTopologies() { return topIdToUsedSlots.keySet(); } @@ -192,7 +213,9 @@ public class RAS_Node { return slots.size(); } - /** Free all slots on this node. This will update the Cluster too. */ + /** + * Free all slots on this node. This will update the Cluster too. + */ public void freeAllSlots() { if (!isAlive) { LOG.warn("Freeing all slots on a dead node {} ", nodeId); @@ -279,8 +302,8 @@ public class RAS_Node { /** * Assigns a worker to a node. * - * @param target the worker slot to assign the executors - * @param td the topology the executors are from + * @param target the worker slot to assign the executors + * @param td the topology the executors are from * @param executors executors to assign to the specified worker slot */ public void assign(WorkerSlot target, TopologyDetails td, Collection<ExecutorDetails> executors) { @@ -343,9 +366,9 @@ public class RAS_Node { /** * Would scheduling exec in ws fit with the current resource constraints. * - * @param ws the slot to possibly put exec in + * @param ws the slot to possibly put exec in * @param exec the executor to possibly place in ws - * @param td the topology exec is a part of + * @param td the topology exec is a part of * @return true if it would fit else false */ public boolean wouldFit(WorkerSlot ws, ExecutorDetails exec, TopologyDetails td) { @@ -353,13 +376,13 @@ public class RAS_Node { throw new IllegalStateException("Slot " + ws + " is not a part of this node " + nodeId); } return isAlive - && cluster.wouldFit( + && cluster.wouldFit( ws, exec, td, getTotalAvailableResources(), td.getTopologyWorkerMaxHeapSize() - ); + ); } @Override @@ -378,40 +401,20 @@ public class RAS_Node { @Override public String toString() { return "{Node: " - + ((sup == null) ? "null (possibly down)" : sup.getHost()) - + ", Avail [ Mem: " - + getAvailableMemoryResources() - + ", CPU: " - + getAvailableCpuResources() - + ", Slots: " - + this.getFreeSlots() - + "] Total [ Mem: " - + ((sup == null) ? "N/A" : this.getTotalMemoryResources()) - + ", CPU: " - + ((sup == null) ? "N/A" : this.getTotalCpuResources()) - + ", Slots: " - + this.slots.values() - + " ]}"; - } - - public static int countFreeSlotsAlive(Collection<RAS_Node> nodes) { - int total = 0; - for (RAS_Node n : nodes) { - if (n.isAlive()) { - total += n.totalSlotsFree(); - } - } - return total; - } - - public static int countTotalSlotsAlive(Collection<RAS_Node> nodes) { - int total = 0; - for (RAS_Node n : nodes) { - if (n.isAlive()) { - total += n.totalSlots(); - } - } - return total; + + ((sup == null) ? "null (possibly down)" : sup.getHost()) + + ", Avail [ Mem: " + + getAvailableMemoryResources() + + ", CPU: " + + getAvailableCpuResources() + + ", Slots: " + + this.getFreeSlots() + + "] Total [ Mem: " + + ((sup == null) ? "N/A" : this.getTotalMemoryResources()) + + ", CPU: " + + ((sup == null) ? "N/A" : this.getTotalCpuResources()) + + ", Slots: " + + this.slots.values() + + " ]}"; } /** http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java index 2bd2b86..584934b 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java @@ -32,9 +32,8 @@ import org.slf4j.LoggerFactory; public class RAS_Nodes { - private Map<String, RAS_Node> nodeMap; - private static final Logger LOG = LoggerFactory.getLogger(RAS_Nodes.class); + private Map<String, RAS_Node> nodeMap; public RAS_Nodes(Cluster cluster) { this.nodeMap = getAllNodesFrom(cluster); @@ -116,13 +115,16 @@ public class RAS_Nodes { return nodeIdToNode; } - /** get node object from nodeId. */ + /** + * get node object from nodeId. + */ public RAS_Node getNodeById(String nodeId) { return this.nodeMap.get(nodeId); } /** * Free everything on the given slots. + * * @param workerSlots the slots to free */ public void freeSlots(Collection<WorkerSlot> workerSlots) { http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java index 541e1a5..60f6f6c 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.resource; @@ -52,11 +46,35 @@ public class ResourceAwareScheduler implements IScheduler { private IConfigLoader configLoader; private int maxSchedulingAttempts; + private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message) { + markFailedTopology(u, c, td, message, null); + } + + private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message, Throwable t) { + c.setStatus(td, message); + String realMessage = td.getId() + " " + message; + if (t != null) { + LOG.error(realMessage, t); + } else { + LOG.error(realMessage); + } + u.markTopoUnsuccess(td); + } + + private static double getCpuUsed(SchedulerAssignment assignment) { + return assignment.getScheduledResources().values().stream().mapToDouble((wr) -> wr.get_cpu()).sum(); + } + + private static double getMemoryUsed(SchedulerAssignment assignment) { + return assignment.getScheduledResources().values().stream() + .mapToDouble((wr) -> wr.get_mem_on_heap() + wr.get_mem_off_heap()).sum(); + } + @Override public void prepare(Map<String, Object> conf) { this.conf = conf; schedulingPriorityStrategy = ReflectionUtils.newInstance( - (String) conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY)); + (String) conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY)); configLoader = ConfigLoaderFactoryService.createConfigLoader(conf); maxSchedulingAttempts = ObjectReader.getInt( conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_TOPOLOGY_SCHEDULING_ATTEMPTS), 5); @@ -85,21 +103,6 @@ public class ResourceAwareScheduler implements IScheduler { } } - private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message) { - markFailedTopology(u, c, td, message, null); - } - - private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message, Throwable t) { - c.setStatus(td, message); - String realMessage = td.getId() + " " + message; - if (t != null) { - LOG.error(realMessage, t); - } else { - LOG.error(realMessage); - } - u.markTopoUnsuccess(td); - } - private void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter, List<TopologyDetails> orderedTopologies) { //A copy of cluster that we can modify, but does not get committed back to cluster unless scheduling succeeds @@ -119,17 +122,17 @@ public class ResourceAwareScheduler implements IScheduler { rasStrategy.prepare(conf); } catch (DisallowedStrategyException e) { markFailedTopology(topologySubmitter, cluster, td, - "Unsuccessful in scheduling - " + e.getAttemptedClass() - + " is not an allowed strategy. Please make sure your " - + Config.TOPOLOGY_SCHEDULER_STRATEGY - + " config is one of the allowed strategies: " - + e.getAllowedStrategies(), e); + "Unsuccessful in scheduling - " + e.getAttemptedClass() + + " is not an allowed strategy. Please make sure your " + + Config.TOPOLOGY_SCHEDULER_STRATEGY + + " config is one of the allowed strategies: " + + e.getAllowedStrategies(), e); return; } catch (RuntimeException e) { markFailedTopology(topologySubmitter, cluster, td, - "Unsuccessful in scheduling - failed to create instance of topology strategy " - + strategyConf - + ". Please check logs for details", e); + "Unsuccessful in scheduling - failed to create instance of topology strategy " + + strategyConf + + ". Please check logs for details", e); return; } @@ -167,7 +170,7 @@ public class ResourceAwareScheduler implements IScheduler { Collection<WorkerSlot> workersToEvict = workingState.getUsedSlotsByTopologyId(topologyEvict.getId()); LOG.debug("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict, - topologyEvict.getTopologySubmitter()); + topologyEvict.getTopologySubmitter()); cpuNeeded -= getCpuUsed(evictAssignemnt); memoryNeeded -= getMemoryUsed(evictAssignemnt); evictedSomething = true; @@ -205,22 +208,13 @@ public class ResourceAwareScheduler implements IScheduler { } } catch (Exception ex) { markFailedTopology(topologySubmitter, cluster, td, - "Internal Error - Exception thrown when scheduling. Please check logs for details", ex); + "Internal Error - Exception thrown when scheduling. Please check logs for details", ex); return; } } markFailedTopology(topologySubmitter, cluster, td, "Failed to schedule within " + maxSchedulingAttempts + " attempts"); } - private static double getCpuUsed(SchedulerAssignment assignment) { - return assignment.getScheduledResources().values().stream().mapToDouble((wr) -> wr.get_cpu()).sum(); - } - - private static double getMemoryUsed(SchedulerAssignment assignment) { - return assignment.getScheduledResources().values().stream() - .mapToDouble((wr) -> wr.get_mem_on_heap() + wr.get_mem_off_heap()).sum(); - } - /** * Get User wrappers around cluster. * @@ -288,7 +282,7 @@ public class ResourceAwareScheduler implements IScheduler { return convertToDouble(raw); } else { LOG.warn("Reading from user-resource-pools.yaml returned null. This could because the file is not available. " - + "Will load configs from storm configuration"); + + "Will load configs from storm configuration"); } // if no configs from user-resource-pools.yaml, get configs from conf http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java index 2f9ab4c..e0a8114 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.resource; @@ -37,7 +31,7 @@ public class ResourceUtils { private static final Logger LOG = LoggerFactory.getLogger(ResourceUtils.class); public static NormalizedResourceRequest getBoltResources(StormTopology topology, Map<String, Object> topologyConf, - String componentId) { + String componentId) { if (topology.get_bolts() != null) { Bolt bolt = topology.get_bolts().get(componentId); return new NormalizedResourceRequest(bolt.get_common(), topologyConf); @@ -46,7 +40,7 @@ public class ResourceUtils { } public static Map<String, NormalizedResourceRequest> getBoltsResources(StormTopology topology, - Map<String, Object> topologyConf) { + Map<String, Object> topologyConf) { Map<String, NormalizedResourceRequest> boltResources = new HashMap<>(); if (topology.get_bolts() != null) { for (Map.Entry<String, Bolt> bolt : topology.get_bolts().entrySet()) { @@ -70,7 +64,7 @@ public class ResourceUtils { } public static Map<String, NormalizedResourceRequest> getSpoutsResources(StormTopology topology, - Map<String, Object> topologyConf) { + Map<String, Object> topologyConf) { Map<String, NormalizedResourceRequest> spoutResources = new HashMap<>(); if (topology.get_spouts() != null) { for (Map.Entry<String, SpoutSpec> spout : topology.get_spouts().entrySet()) { @@ -128,7 +122,7 @@ public class ResourceUtils { } public static String getCorrespondingLegacyResourceName(String normalizedResourceName) { - for(Map.Entry<String, String> entry : NormalizedResources.RESOURCE_NAME_NORMALIZER.getResourceNameMapping().entrySet()) { + for (Map.Entry<String, String> entry : NormalizedResources.RESOURCE_NAME_NORMALIZER.getResourceNameMapping().entrySet()) { if (entry.getValue().equals(normalizedResourceName)) { return entry.getKey(); } @@ -143,9 +137,9 @@ public class ResourceUtils { JSONObject jsonObject = (JSONObject) obj; Map<String, Double> componentResourceMap = - (Map<String, Double>) jsonObject.getOrDefault( - Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<String, Double>() - ); + (Map<String, Double>) jsonObject.getOrDefault( + Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<String, Double>() + ); for (Map.Entry<String, Double> resourceUpdateEntry : resourceUpdates.entrySet()) { if (NormalizedResources.RESOURCE_NAME_NORMALIZER.getResourceNameMapping().containsValue(resourceUpdateEntry.getKey())) { @@ -159,7 +153,7 @@ public class ResourceUtils { return jsonObject.toJSONString(); } catch (ParseException ex) { - throw new RuntimeException("Failed to parse component resources with json: " + jsonConf); + throw new RuntimeException("Failed to parse component resources with json: " + jsonConf); } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingResult.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingResult.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingResult.java index 72f083d..882c6be 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingResult.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingResult.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.resource; @@ -66,7 +60,7 @@ public class SchedulingResult { public String getErrorMessage() { return this.errorMessage; } - + public boolean isSuccess() { return SchedulingStatus.isStatusSuccess(this.status); } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingStatus.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingStatus.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingStatus.java index 703f4b8..47c0541 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingStatus.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingStatus.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.resource; @@ -28,7 +22,7 @@ public enum SchedulingStatus { public static EnumSet<SchedulingStatus> success = EnumSet.of(SUCCESS); public static EnumSet<SchedulingStatus> failure = EnumSet.of(FAIL_INVALID_TOPOLOGY, FAIL_NOT_ENOUGH_RESOURCES, - FAIL_OTHER); + FAIL_OTHER); public static boolean isStatusSuccess(SchedulingStatus status) { return success.contains(status); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java index 9083510..ee79d59 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java @@ -18,13 +18,11 @@ package org.apache.storm.scheduler.resource; -import java.util.Collection; import java.util.Comparator; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.TreeSet; - import org.apache.storm.daemon.nimbus.TopologyResources; import org.apache.storm.scheduler.Cluster; import org.apache.storm.scheduler.ISchedulingState; @@ -32,13 +30,11 @@ import org.apache.storm.scheduler.SchedulerAssignment; import org.apache.storm.scheduler.TopologyDetails; public class User { - private String userId; - //Topologies that were deemed to be invalid private final Set<TopologyDetails> unsuccess = new HashSet<>(); - private final double cpuGuarantee; private final double memoryGuarantee; + private String userId; public User(String userId) { this(userId, 0, 0); @@ -206,8 +202,8 @@ public class User { } /** - * Comparator that sorts topologies by priority and then by submission time First sort by Topology - * Priority, if there is a tie for topology priority, topology uptime is used to sort. + * Comparator that sorts topologies by priority and then by submission time First sort by Topology Priority, if there is a tie for + * topology priority, topology uptime is used to sort. */ static class PQsortByPriorityAndSubmittionTime implements Comparator<TopologyDetails> { http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java index 417dca9..0c073a8 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java @@ -74,12 +74,13 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory { if (totalMemoryMb < 0.0) { normalizedResources.throwBecauseResourceBecameNegative( Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb, other.getTotalMemoryMb()); - }; + } + ; } /** * @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources, - * double, double). + * double, double). */ public double calculateAveragePercentageUsedBy(NormalizedResourceOffer used) { return normalizedResources.calculateAveragePercentageUsedBy( @@ -88,7 +89,7 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory { /** * @see NormalizedResources#calculateMinPercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double, - * double) + * double) */ public double calculateMinPercentageUsedBy(NormalizedResourceOffer used) { return normalizedResources.calculateMinPercentageUsedBy(used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb()); @@ -96,7 +97,7 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory { /** * @see NormalizedResources#couldHoldIgnoringSharedMemory(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double, - * double). + * double). */ public boolean couldHoldIgnoringSharedMemory(NormalizedResourcesWithMemory other) { return normalizedResources.couldHoldIgnoringSharedMemory( http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java index 6627bb5..dc380e6 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java @@ -37,6 +37,28 @@ import org.slf4j.LoggerFactory; public class NormalizedResourceRequest implements NormalizedResourcesWithMemory { private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceRequest.class); + private final NormalizedResources normalizedResources; + private double onHeap; + private double offHeap; + + private NormalizedResourceRequest(Map<String, ? extends Number> resources, + Map<String, Double> defaultResources) { + Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(defaultResources); + normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources)); + onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0); + offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0); + normalizedResources = new NormalizedResources(normalizedResourceMap); + } + public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf) { + this(parseResources(component.get_json_conf()), getDefaultResources(topoConf)); + } + public NormalizedResourceRequest(Map<String, Object> topoConf) { + this((Map<String, ? extends Number>) null, getDefaultResources(topoConf)); + } + + public NormalizedResourceRequest() { + this((Map<String, ? extends Number>) null, null); + } private static void putIfMissing(Map<String, Double> dest, String destKey, Map<String, Object> src, String srcKey) { if (!dest.containsKey(destKey)) { @@ -48,8 +70,9 @@ public class NormalizedResourceRequest implements NormalizedResourcesWithMemory } private static Map<String, Double> getDefaultResources(Map<String, Object> topoConf) { - Map<String, Double> ret = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault( - Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>())); + Map<String, Double> ret = + NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault( + Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>())); putIfMissing(ret, Constants.COMMON_CPU_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); putIfMissing(ret, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); putIfMissing(ret, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); @@ -78,7 +101,7 @@ public class NormalizedResourceRequest implements NormalizedResourcesWithMemory } if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) { Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), - null); + null); topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu); } @@ -102,31 +125,6 @@ public class NormalizedResourceRequest implements NormalizedResourcesWithMemory return topologyResources; } - private final NormalizedResources normalizedResources; - private double onHeap; - private double offHeap; - - private NormalizedResourceRequest(Map<String, ? extends Number> resources, - Map<String, Double> defaultResources) { - Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(defaultResources); - normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources)); - onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0); - offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0); - normalizedResources = new NormalizedResources(normalizedResourceMap); - } - - public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf) { - this(parseResources(component.get_json_conf()), getDefaultResources(topoConf)); - } - - public NormalizedResourceRequest(Map<String, Object> topoConf) { - this((Map<String, ? extends Number>) null, getDefaultResources(topoConf)); - } - - public NormalizedResourceRequest() { - this((Map<String, ? extends Number>) null, null); - } - public Map<String, Double> toNormalizedMap() { Map<String, Double> ret = this.normalizedResources.toNormalizedMap(); ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java index 76d5ce2..c41cd95 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java @@ -45,16 +45,6 @@ public class NormalizedResources { private double[] otherResources; /** - * This is for testing only. It allows a test to reset the static state relating to resource names. We reset the mapping because some - * algorithms sadly have different behavior if a resource exists or not. - */ - @VisibleForTesting - public static void resetResourceNames() { - RESOURCE_NAME_NORMALIZER = new ResourceNameNormalizer(); - RESOURCE_MAP_ARRAY_BRIDGE = new ResourceMapArrayBridge(); - } - - /** * Copy constructor. */ public NormalizedResources(NormalizedResources other) { @@ -74,6 +64,16 @@ public class NormalizedResources { } /** + * This is for testing only. It allows a test to reset the static state relating to resource names. We reset the mapping because some + * algorithms sadly have different behavior if a resource exists or not. + */ + @VisibleForTesting + public static void resetResourceNames() { + RESOURCE_NAME_NORMALIZER = new ResourceNameNormalizer(); + RESOURCE_MAP_ARRAY_BRIDGE = new ResourceMapArrayBridge(); + } + + /** * Get the total amount of cpu. * * @return the amount of cpu. @@ -81,7 +81,7 @@ public class NormalizedResources { public double getTotalCpu() { return cpu; } - + private void zeroPadOtherResourcesIfNecessary(int requiredLength) { if (requiredLength > otherResources.length) { double[] newResources = new double[requiredLength]; @@ -116,16 +116,18 @@ public class NormalizedResources { /** * Throw an IllegalArgumentException because a resource became negative during remove. - * @param resourceName The name of the resource that became negative - * @param currentValue The current value of the resource + * + * @param resourceName The name of the resource that became negative + * @param currentValue The current value of the resource * @param subtractedValue The value that was subtracted to make the resource negative */ public void throwBecauseResourceBecameNegative(String resourceName, double currentValue, double subtractedValue) { throw new IllegalArgumentException(String.format("Resource amounts should never be negative." - + " Resource '%s' with current value '%f' became negative because '%f' was removed.", - resourceName, currentValue, subtractedValue)); + + + " Resource '%s' with current value '%f' became negative because '%f' was removed.", + resourceName, currentValue, subtractedValue)); } - + /** * Remove the other resources from this. This is the same as subtracting the resources in other from this. * @@ -173,8 +175,8 @@ public class NormalizedResources { * A simple sanity check to see if all of the resources in this would be large enough to hold the resources in other ignoring memory. It * does not check memory because with shared memory it is beyond the scope of this. * - * @param other the resources that we want to check if they would fit in this. - * @param thisTotalMemoryMb The total memory in MB of this + * @param other the resources that we want to check if they would fit in this. + * @param thisTotalMemoryMb The total memory in MB of this * @param otherTotalMemoryMb The total memory in MB of other * @return true if it might fit, else false if it could not possibly fit. */ @@ -204,8 +206,8 @@ public class NormalizedResources { private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) { throw new IllegalArgumentException(String.format("The used resources must be a subset of the total resources." - + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'", - used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb)); + + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'", + used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb)); } /** @@ -213,18 +215,19 @@ public class NormalizedResources { * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid * division by 0. If all resources are skipped the result is defined to be 100.0. * - * @param used the amount of resources used. + * @param used the amount of resources used. * @param totalMemoryMb The total memory in MB - * @param usedMemoryMb The used memory in MB + * @param usedMemoryMb The used memory in MB * @return the average percentage used 0.0 to 100.0. + * * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic - * resources that are not present in the total. + * resources that are not present in the total. */ public double calculateAveragePercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) { if (LOG.isTraceEnabled()) { LOG.trace("Calculating avg percentage used by. Used Mem: {} Total Mem: {}" - + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb, - toNormalizedMap(), used.toNormalizedMap()); + + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb, + toNormalizedMap(), used.toNormalizedMap()); } int skippedResourceTypes = 0; @@ -290,18 +293,19 @@ public class NormalizedResources { * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid * division by 0. If all resources are skipped the result is defined to be 100.0. * - * @param used the amount of resources used. + * @param used the amount of resources used. * @param totalMemoryMb The total memory in MB - * @param usedMemoryMb The used memory in MB + * @param usedMemoryMb The used memory in MB * @return the minimum percentage used 0.0 to 100.0. + * * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic - * resources that are not present in the total. + * resources that are not present in the total. */ public double calculateMinPercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) { if (LOG.isTraceEnabled()) { LOG.trace("Calculating min percentage used by. Used Mem: {} Total Mem: {}" - + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb, - toNormalizedMap(), used.toNormalizedMap()); + + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb, + toNormalizedMap(), used.toNormalizedMap()); } double min = 1.0; @@ -322,7 +326,7 @@ public class NormalizedResources { if (used.otherResources.length > otherResources.length) { throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb); } - + for (int i = 0; i < otherResources.length; i++) { if (otherResources[i] == 0.0) { //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
