http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/IsolatedPool.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/IsolatedPool.java b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/IsolatedPool.java index dc7eded..3a50a3f 100755 --- a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/IsolatedPool.java +++ b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/IsolatedPool.java @@ -40,307 +40,297 @@ import backtype.storm.scheduler.WorkerSlot; * A pool of machines that can be used to run isolated topologies */ public class IsolatedPool extends NodePool { - private static final Logger LOG = LoggerFactory.getLogger(IsolatedPool.class); - private Map<String, Set<Node>> _topologyIdToNodes = new HashMap<String, Set<Node>>(); - private HashMap<String, TopologyDetails> _tds = new HashMap<String, TopologyDetails>(); - private HashSet<String> _isolated = new HashSet<String>(); - private int _maxNodes; - private int _usedNodes; + private static final Logger LOG = LoggerFactory.getLogger(IsolatedPool.class); + private Map<String, Set<Node>> _topologyIdToNodes = new HashMap<String, Set<Node>>(); + private HashMap<String, TopologyDetails> _tds = new HashMap<String, TopologyDetails>(); + private HashSet<String> _isolated = new HashSet<String>(); + private int _maxNodes; + private int _usedNodes; - public IsolatedPool(int maxNodes) { - _maxNodes = maxNodes; - _usedNodes = 0; - } - - @Override - public void addTopology(TopologyDetails td) { - String topId = td.getId(); - LOG.debug("Adding in Topology {}", topId); - SchedulerAssignment assignment = _cluster.getAssignmentById(topId); - Set<Node> assignedNodes = new HashSet<Node>(); - if (assignment != null) { - for (WorkerSlot ws: assignment.getSlots()) { - Node n = _nodeIdToNode.get(ws.getNodeId()); - assignedNodes.add(n); - } - } - _usedNodes += assignedNodes.size(); - _topologyIdToNodes.put(topId, assignedNodes); - _tds.put(topId, td); - if (td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES) != null) { - _isolated.add(topId); + public IsolatedPool(int maxNodes) { + _maxNodes = maxNodes; + _usedNodes = 0; } - } - @Override - public boolean canAdd(TopologyDetails td) { - //Only add topologies that are not sharing nodes with other topologies - String topId = td.getId(); - SchedulerAssignment assignment = _cluster.getAssignmentById(topId); - if (assignment != null) { - for (WorkerSlot ws: assignment.getSlots()) { - Node n = _nodeIdToNode.get(ws.getNodeId()); - if (n.getRunningTopologies().size() > 1) { - return false; - } - } - } - return true; - } - - @Override - public void scheduleAsNeeded(NodePool ... lesserPools) { - for (String topId : _topologyIdToNodes.keySet()) { - TopologyDetails td = _tds.get(topId); - if (_cluster.needsScheduling(td)) { - LOG.debug("Scheduling topology {}",topId); - Set<Node> allNodes = _topologyIdToNodes.get(topId); - Number nodesRequested = (Number) td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES); - int slotsToUse = 0; - if (nodesRequested == null) { - slotsToUse = getNodesForNotIsolatedTop(td, allNodes, lesserPools); - } else { - slotsToUse = getNodesForIsolatedTop(td, allNodes, lesserPools, - nodesRequested.intValue()); + @Override + public void addTopology(TopologyDetails td) { + String topId = td.getId(); + LOG.debug("Adding in Topology {}", topId); + SchedulerAssignment assignment = _cluster.getAssignmentById(topId); + Set<Node> assignedNodes = new HashSet<Node>(); + if (assignment != null) { + for (WorkerSlot ws : assignment.getSlots()) { + Node n = _nodeIdToNode.get(ws.getNodeId()); + assignedNodes.add(n); + } } - //No slots to schedule for some reason, so skip it. - if (slotsToUse <= 0) { - continue; + _usedNodes += assignedNodes.size(); + _topologyIdToNodes.put(topId, assignedNodes); + _tds.put(topId, td); + if (td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES) != null) { + _isolated.add(topId); } - - RoundRobinSlotScheduler slotSched = - new RoundRobinSlotScheduler(td, slotsToUse, _cluster); - - LinkedList<Node> sortedNodes = new LinkedList<Node>(allNodes); - Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC); + } - LOG.debug("Nodes sorted by free space {}", sortedNodes); - while (true) { - Node n = sortedNodes.remove(); - if (!slotSched.assignSlotTo(n)) { - break; - } - int freeSlots = n.totalSlotsFree(); - for (int i = 0; i < sortedNodes.size(); i++) { - if (freeSlots >= sortedNodes.get(i).totalSlotsFree()) { - sortedNodes.add(i, n); - n = null; - break; + @Override + public boolean canAdd(TopologyDetails td) { + // Only add topologies that are not sharing nodes with other topologies + String topId = td.getId(); + SchedulerAssignment assignment = _cluster.getAssignmentById(topId); + if (assignment != null) { + for (WorkerSlot ws : assignment.getSlots()) { + Node n = _nodeIdToNode.get(ws.getNodeId()); + if (n.getRunningTopologies().size() > 1) { + return false; + } } - } - if (n != null) { - sortedNodes.add(n); - } } - } - Set<Node> found = _topologyIdToNodes.get(topId); - int nc = found == null ? 0 : found.size(); - _cluster.setStatus(topId,"Scheduled Isolated on "+nc+" Nodes"); + return true; } - } - - /** - * Get the nodes needed to schedule an isolated topology. - * @param td the topology to be scheduled - * @param allNodes the nodes already scheduled for this topology. - * This will be updated to include new nodes if needed. - * @param lesserPools node pools we can steal nodes from - * @return the number of additional slots that should be used for scheduling. - */ - private int getNodesForIsolatedTop(TopologyDetails td, Set<Node> allNodes, - NodePool[] lesserPools, int nodesRequested) { - String topId = td.getId(); - LOG.debug("Topology {} is isolated", topId); - int nodesFromUsAvailable = nodesAvailable(); - int nodesFromOthersAvailable = NodePool.nodesAvailable(lesserPools); - int nodesUsed = _topologyIdToNodes.get(topId).size(); - int nodesNeeded = nodesRequested - nodesUsed; - LOG.debug("Nodes... requested {} used {} available from us {} " + - "avail from other {} needed {}", new Object[] {nodesRequested, - nodesUsed, nodesFromUsAvailable, nodesFromOthersAvailable, - nodesNeeded}); - if ((nodesNeeded - nodesFromUsAvailable) > (_maxNodes - _usedNodes)) { - _cluster.setStatus(topId,"Max Nodes("+_maxNodes+") for this user would be exceeded. " - + ((nodesNeeded - nodesFromUsAvailable) - (_maxNodes - _usedNodes)) - + " more nodes needed to run topology."); - return 0; - } + @Override + public void scheduleAsNeeded(NodePool... lesserPools) { + for (String topId : _topologyIdToNodes.keySet()) { + TopologyDetails td = _tds.get(topId); + if (_cluster.needsScheduling(td)) { + LOG.debug("Scheduling topology {}", topId); + Set<Node> allNodes = _topologyIdToNodes.get(topId); + Number nodesRequested = (Number) td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES); + int slotsToUse = 0; + if (nodesRequested == null) { + slotsToUse = getNodesForNotIsolatedTop(td, allNodes, lesserPools); + } else { + slotsToUse = getNodesForIsolatedTop(td, allNodes, lesserPools, nodesRequested.intValue()); + } + // No slots to schedule for some reason, so skip it. + if (slotsToUse <= 0) { + continue; + } - //In order to avoid going over _maxNodes I may need to steal from - // myself even though other pools have free nodes. so figure out how - // much each group should provide - int nodesNeededFromOthers = Math.min(Math.min(_maxNodes - _usedNodes, - nodesFromOthersAvailable), nodesNeeded); - int nodesNeededFromUs = nodesNeeded - nodesNeededFromOthers; - LOG.debug("Nodes... needed from us {} needed from others {}", - nodesNeededFromUs, nodesNeededFromOthers); + RoundRobinSlotScheduler slotSched = new RoundRobinSlotScheduler(td, slotsToUse, _cluster); - if (nodesNeededFromUs > nodesFromUsAvailable) { - _cluster.setStatus(topId, "Not Enough Nodes Available to Schedule Topology"); - return 0; + LinkedList<Node> sortedNodes = new LinkedList<Node>(allNodes); + Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC); + + LOG.debug("Nodes sorted by free space {}", sortedNodes); + while (true) { + Node n = sortedNodes.remove(); + if (!slotSched.assignSlotTo(n)) { + break; + } + int freeSlots = n.totalSlotsFree(); + for (int i = 0; i < sortedNodes.size(); i++) { + if (freeSlots >= sortedNodes.get(i).totalSlotsFree()) { + sortedNodes.add(i, n); + n = null; + break; + } + } + if (n != null) { + sortedNodes.add(n); + } + } + } + Set<Node> found = _topologyIdToNodes.get(topId); + int nc = found == null ? 0 : found.size(); + _cluster.setStatus(topId, "Scheduled Isolated on " + nc + " Nodes"); + } } - //Get the nodes - Collection<Node> found = NodePool.takeNodes(nodesNeededFromOthers, lesserPools); - _usedNodes += found.size(); - allNodes.addAll(found); - Collection<Node> foundMore = takeNodes(nodesNeededFromUs); - _usedNodes += foundMore.size(); - allNodes.addAll(foundMore); + /** + * Get the nodes needed to schedule an isolated topology. + * + * @param td the topology to be scheduled + * @param allNodes the nodes already scheduled for this topology. This will be updated to include new nodes if needed. + * @param lesserPools node pools we can steal nodes from + * @return the number of additional slots that should be used for scheduling. + */ + private int getNodesForIsolatedTop(TopologyDetails td, Set<Node> allNodes, NodePool[] lesserPools, int nodesRequested) { + String topId = td.getId(); + LOG.debug("Topology {} is isolated", topId); + int nodesFromUsAvailable = nodesAvailable(); + int nodesFromOthersAvailable = NodePool.nodesAvailable(lesserPools); - int totalTasks = td.getExecutors().size(); - int origRequest = td.getNumWorkers(); - int slotsRequested = Math.min(totalTasks, origRequest); - int slotsUsed = Node.countSlotsUsed(allNodes); - int slotsFree = Node.countFreeSlotsAlive(allNodes); - int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree); - if (slotsToUse <= 0) { - _cluster.setStatus(topId, "Node has partially crashed, if this situation persists rebalance the topology."); - } - return slotsToUse; - } - - /** - * Get the nodes needed to schedule a non-isolated topology. - * @param td the topology to be scheduled - * @param allNodes the nodes already scheduled for this topology. - * This will be updated to include new nodes if needed. - * @param lesserPools node pools we can steal nodes from - * @return the number of additional slots that should be used for scheduling. - */ - private int getNodesForNotIsolatedTop(TopologyDetails td, Set<Node> allNodes, - NodePool[] lesserPools) { - String topId = td.getId(); - LOG.debug("Topology {} is not isolated",topId); - int totalTasks = td.getExecutors().size(); - int origRequest = td.getNumWorkers(); - int slotsRequested = Math.min(totalTasks, origRequest); - int slotsUsed = Node.countSlotsUsed(topId, allNodes); - int slotsFree = Node.countFreeSlotsAlive(allNodes); - //Check to see if we have enough slots before trying to get them - int slotsAvailable = 0; - if (slotsRequested > slotsFree) { - slotsAvailable = NodePool.slotsAvailable(lesserPools); - } - int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable); - LOG.debug("Slots... requested {} used {} free {} available {} to be used {}", - new Object[] {slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse}); - if (slotsToUse <= 0) { - _cluster.setStatus(topId, "Not Enough Slots Available to Schedule Topology"); - return 0; + int nodesUsed = _topologyIdToNodes.get(topId).size(); + int nodesNeeded = nodesRequested - nodesUsed; + LOG.debug("Nodes... requested {} used {} available from us {} " + "avail from other {} needed {}", new Object[] { nodesRequested, nodesUsed, + nodesFromUsAvailable, nodesFromOthersAvailable, nodesNeeded }); + if ((nodesNeeded - nodesFromUsAvailable) > (_maxNodes - _usedNodes)) { + _cluster.setStatus(topId, "Max Nodes(" + _maxNodes + ") for this user would be exceeded. " + + ((nodesNeeded - nodesFromUsAvailable) - (_maxNodes - _usedNodes)) + " more nodes needed to run topology."); + return 0; + } + + // In order to avoid going over _maxNodes I may need to steal from + // myself even though other pools have free nodes. so figure out how + // much each group should provide + int nodesNeededFromOthers = Math.min(Math.min(_maxNodes - _usedNodes, nodesFromOthersAvailable), nodesNeeded); + int nodesNeededFromUs = nodesNeeded - nodesNeededFromOthers; + LOG.debug("Nodes... needed from us {} needed from others {}", nodesNeededFromUs, nodesNeededFromOthers); + + if (nodesNeededFromUs > nodesFromUsAvailable) { + _cluster.setStatus(topId, "Not Enough Nodes Available to Schedule Topology"); + return 0; + } + + // Get the nodes + Collection<Node> found = NodePool.takeNodes(nodesNeededFromOthers, lesserPools); + _usedNodes += found.size(); + allNodes.addAll(found); + Collection<Node> foundMore = takeNodes(nodesNeededFromUs); + _usedNodes += foundMore.size(); + allNodes.addAll(foundMore); + + int totalTasks = td.getExecutors().size(); + int origRequest = td.getNumWorkers(); + int slotsRequested = Math.min(totalTasks, origRequest); + int slotsUsed = Node.countSlotsUsed(allNodes); + int slotsFree = Node.countFreeSlotsAlive(allNodes); + int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree); + if (slotsToUse <= 0) { + _cluster.setStatus(topId, "Node has partially crashed, if this situation persists rebalance the topology."); + } + return slotsToUse; } - int slotsNeeded = slotsToUse - slotsFree; - int numNewNodes = NodePool.getNodeCountIfSlotsWereTaken(slotsNeeded, lesserPools); - LOG.debug("Nodes... new {} used {} max {}", - new Object[]{numNewNodes, _usedNodes, _maxNodes}); - if ((numNewNodes + _usedNodes) > _maxNodes) { - _cluster.setStatus(topId,"Max Nodes("+_maxNodes+") for this user would be exceeded. " + - (numNewNodes - (_maxNodes - _usedNodes)) + " more nodes needed to run topology."); - return 0; + + /** + * Get the nodes needed to schedule a non-isolated topology. + * + * @param td the topology to be scheduled + * @param allNodes the nodes already scheduled for this topology. This will be updated to include new nodes if needed. + * @param lesserPools node pools we can steal nodes from + * @return the number of additional slots that should be used for scheduling. + */ + private int getNodesForNotIsolatedTop(TopologyDetails td, Set<Node> allNodes, NodePool[] lesserPools) { + String topId = td.getId(); + LOG.debug("Topology {} is not isolated", topId); + int totalTasks = td.getExecutors().size(); + int origRequest = td.getNumWorkers(); + int slotsRequested = Math.min(totalTasks, origRequest); + int slotsUsed = Node.countSlotsUsed(topId, allNodes); + int slotsFree = Node.countFreeSlotsAlive(allNodes); + // Check to see if we have enough slots before trying to get them + int slotsAvailable = 0; + if (slotsRequested > slotsFree) { + slotsAvailable = NodePool.slotsAvailable(lesserPools); + } + int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable); + LOG.debug("Slots... requested {} used {} free {} available {} to be used {}", new Object[] { slotsRequested, slotsUsed, slotsFree, slotsAvailable, + slotsToUse }); + if (slotsToUse <= 0) { + _cluster.setStatus(topId, "Not Enough Slots Available to Schedule Topology"); + return 0; + } + int slotsNeeded = slotsToUse - slotsFree; + int numNewNodes = NodePool.getNodeCountIfSlotsWereTaken(slotsNeeded, lesserPools); + LOG.debug("Nodes... new {} used {} max {}", new Object[] { numNewNodes, _usedNodes, _maxNodes }); + if ((numNewNodes + _usedNodes) > _maxNodes) { + _cluster.setStatus(topId, "Max Nodes(" + _maxNodes + ") for this user would be exceeded. " + (numNewNodes - (_maxNodes - _usedNodes)) + + " more nodes needed to run topology."); + return 0; + } + + Collection<Node> found = NodePool.takeNodesBySlot(slotsNeeded, lesserPools); + _usedNodes += found.size(); + allNodes.addAll(found); + return slotsToUse; } - - Collection<Node> found = NodePool.takeNodesBySlot(slotsNeeded, lesserPools); - _usedNodes += found.size(); - allNodes.addAll(found); - return slotsToUse; - } - @Override - public Collection<Node> takeNodes(int nodesNeeded) { - LOG.debug("Taking {} from {}", nodesNeeded, this); - HashSet<Node> ret = new HashSet<Node>(); - for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) { - if (!_isolated.contains(entry.getKey())) { - Iterator<Node> it = entry.getValue().iterator(); - while (it.hasNext()) { - if (nodesNeeded <= 0) { - return ret; - } - Node n = it.next(); - it.remove(); - n.freeAllSlots(_cluster); - ret.add(n); - nodesNeeded--; - _usedNodes--; + @Override + public Collection<Node> takeNodes(int nodesNeeded) { + LOG.debug("Taking {} from {}", nodesNeeded, this); + HashSet<Node> ret = new HashSet<Node>(); + for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) { + if (!_isolated.contains(entry.getKey())) { + Iterator<Node> it = entry.getValue().iterator(); + while (it.hasNext()) { + if (nodesNeeded <= 0) { + return ret; + } + Node n = it.next(); + it.remove(); + n.freeAllSlots(_cluster); + ret.add(n); + nodesNeeded--; + _usedNodes--; + } + } } - } + return ret; } - return ret; - } - - @Override - public int nodesAvailable() { - int total = 0; - for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) { - if (!_isolated.contains(entry.getKey())) { - total += entry.getValue().size(); - } + + @Override + public int nodesAvailable() { + int total = 0; + for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) { + if (!_isolated.contains(entry.getKey())) { + total += entry.getValue().size(); + } + } + return total; } - return total; - } - - @Override - public int slotsAvailable() { - int total = 0; - for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) { - if (!_isolated.contains(entry.getKey())) { - total += Node.countTotalSlotsAlive(entry.getValue()); - } + + @Override + public int slotsAvailable() { + int total = 0; + for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) { + if (!_isolated.contains(entry.getKey())) { + total += Node.countTotalSlotsAlive(entry.getValue()); + } + } + return total; } - return total; - } - @Override - public Collection<Node> takeNodesBySlots(int slotsNeeded) { - HashSet<Node> ret = new HashSet<Node>(); - for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) { - if (!_isolated.contains(entry.getKey())) { - Iterator<Node> it = entry.getValue().iterator(); - while (it.hasNext()) { - Node n = it.next(); - if (n.isAlive()) { - it.remove(); - _usedNodes--; - n.freeAllSlots(_cluster); - ret.add(n); - slotsNeeded -= n.totalSlots(); - if (slotsNeeded <= 0) { - return ret; + @Override + public Collection<Node> takeNodesBySlots(int slotsNeeded) { + HashSet<Node> ret = new HashSet<Node>(); + for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) { + if (!_isolated.contains(entry.getKey())) { + Iterator<Node> it = entry.getValue().iterator(); + while (it.hasNext()) { + Node n = it.next(); + if (n.isAlive()) { + it.remove(); + _usedNodes--; + n.freeAllSlots(_cluster); + ret.add(n); + slotsNeeded -= n.totalSlots(); + if (slotsNeeded <= 0) { + return ret; + } + } + } } - } } - } + return ret; } - return ret; - } - - @Override - public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) { - int nodesFound = 0; - int slotsFound = 0; - for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) { - if (!_isolated.contains(entry.getKey())) { - Iterator<Node> it = entry.getValue().iterator(); - while (it.hasNext()) { - Node n = it.next(); - if (n.isAlive()) { - nodesFound++; - int totalSlotsFree = n.totalSlots(); - slotsFound += totalSlotsFree; - slotsNeeded -= totalSlotsFree; - if (slotsNeeded <= 0) { - return new NodeAndSlotCounts(nodesFound, slotsFound); + + @Override + public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) { + int nodesFound = 0; + int slotsFound = 0; + for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) { + if (!_isolated.contains(entry.getKey())) { + Iterator<Node> it = entry.getValue().iterator(); + while (it.hasNext()) { + Node n = it.next(); + if (n.isAlive()) { + nodesFound++; + int totalSlotsFree = n.totalSlots(); + slotsFound += totalSlotsFree; + slotsNeeded -= totalSlotsFree; + if (slotsNeeded <= 0) { + return new NodeAndSlotCounts(nodesFound, slotsFound); + } + } + } } - } } - } + return new NodeAndSlotCounts(nodesFound, slotsFound); + } + + @Override + public String toString() { + return "IsolatedPool... "; } - return new NodeAndSlotCounts(nodesFound, slotsFound); - } - - @Override - public String toString() { - return "IsolatedPool... "; - } }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/MultitenantScheduler.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/MultitenantScheduler.java b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/MultitenantScheduler.java index 320b388..27475d9 100755 --- a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/MultitenantScheduler.java +++ b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/MultitenantScheduler.java @@ -32,67 +32,66 @@ import backtype.storm.scheduler.TopologyDetails; import backtype.storm.utils.Utils; public class MultitenantScheduler implements IScheduler { - private static final Logger LOG = LoggerFactory.getLogger(MultitenantScheduler.class); - @SuppressWarnings("rawtypes") - private Map _conf; - - @Override - public void prepare(@SuppressWarnings("rawtypes") Map conf) { - _conf = conf; - } - - private Map<String, Number> getUserConf() { - Map<String, Number> ret = (Map<String, Number>)_conf.get(Config.MULTITENANT_SCHEDULER_USER_POOLS); - if (ret == null) { - ret = new HashMap<String, Number>(); - } else { - ret = new HashMap<String, Number>(ret); - } + private static final Logger LOG = LoggerFactory.getLogger(MultitenantScheduler.class); + @SuppressWarnings("rawtypes") + private Map _conf; - Map fromFile = Utils.findAndReadConfigFile("multitenant-scheduler.yaml", false); - Map<String, Number> tmp = (Map<String, Number>)fromFile.get(Config.MULTITENANT_SCHEDULER_USER_POOLS); - if (tmp != null) { - ret.putAll(tmp); + @Override + public void prepare(@SuppressWarnings("rawtypes") Map conf) { + _conf = conf; } - return ret; - } - - @Override - public void schedule(Topologies topologies, Cluster cluster) { - LOG.debug("Rerunning scheduling..."); - Map<String, Node> nodeIdToNode = Node.getAllNodesFrom(cluster); - - Map<String, Number> userConf = getUserConf(); - - Map<String, IsolatedPool> userPools = new HashMap<String, IsolatedPool>(); - for (Map.Entry<String, Number> entry : userConf.entrySet()) { - userPools.put(entry.getKey(), new IsolatedPool(entry.getValue().intValue())); - } - DefaultPool defaultPool = new DefaultPool(); - FreePool freePool = new FreePool(); - - freePool.init(cluster, nodeIdToNode); - for (IsolatedPool pool : userPools.values()) { - pool.init(cluster, nodeIdToNode); - } - defaultPool.init(cluster, nodeIdToNode); - - for (TopologyDetails td: topologies.getTopologies()) { - String user = (String)td.getConf().get(Config.TOPOLOGY_SUBMITTER_USER); - LOG.debug("Found top {} run by user {}",td.getId(), user); - NodePool pool = userPools.get(user); - if (pool == null || !pool.canAdd(td)) { - pool = defaultPool; - } - pool.addTopology(td); + private Map<String, Number> getUserConf() { + Map<String, Number> ret = (Map<String, Number>) _conf.get(Config.MULTITENANT_SCHEDULER_USER_POOLS); + if (ret == null) { + ret = new HashMap<String, Number>(); + } else { + ret = new HashMap<String, Number>(ret); + } + + Map fromFile = Utils.findAndReadConfigFile("multitenant-scheduler.yaml", false); + Map<String, Number> tmp = (Map<String, Number>) fromFile.get(Config.MULTITENANT_SCHEDULER_USER_POOLS); + if (tmp != null) { + ret.putAll(tmp); + } + return ret; } - - //Now schedule all of the topologies that need to be scheduled - for (IsolatedPool pool : userPools.values()) { - pool.scheduleAsNeeded(freePool, defaultPool); + + @Override + public void schedule(Topologies topologies, Cluster cluster) { + LOG.debug("Rerunning scheduling..."); + Map<String, Node> nodeIdToNode = Node.getAllNodesFrom(cluster); + + Map<String, Number> userConf = getUserConf(); + + Map<String, IsolatedPool> userPools = new HashMap<String, IsolatedPool>(); + for (Map.Entry<String, Number> entry : userConf.entrySet()) { + userPools.put(entry.getKey(), new IsolatedPool(entry.getValue().intValue())); + } + DefaultPool defaultPool = new DefaultPool(); + FreePool freePool = new FreePool(); + + freePool.init(cluster, nodeIdToNode); + for (IsolatedPool pool : userPools.values()) { + pool.init(cluster, nodeIdToNode); + } + defaultPool.init(cluster, nodeIdToNode); + + for (TopologyDetails td : topologies.getTopologies()) { + String user = (String) td.getConf().get(Config.TOPOLOGY_SUBMITTER_USER); + LOG.debug("Found top {} run by user {}", td.getId(), user); + NodePool pool = userPools.get(user); + if (pool == null || !pool.canAdd(td)) { + pool = defaultPool; + } + pool.addTopology(td); + } + + // Now schedule all of the topologies that need to be scheduled + for (IsolatedPool pool : userPools.values()) { + pool.scheduleAsNeeded(freePool, defaultPool); + } + defaultPool.scheduleAsNeeded(freePool); + LOG.debug("Scheduling done..."); } - defaultPool.scheduleAsNeeded(freePool); - LOG.debug("Scheduling done..."); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/Node.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/Node.java b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/Node.java index 883c65f..2cc49a8 100755 --- a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/Node.java +++ b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/Node.java @@ -39,305 +39,299 @@ import backtype.storm.scheduler.WorkerSlot; * Represents a single node in the cluster. */ public class Node { - private static final Logger LOG = LoggerFactory.getLogger(Node.class); - private Map<String, Set<WorkerSlot>> _topIdToUsedSlots = new HashMap<String,Set<WorkerSlot>>(); - private Set<WorkerSlot> _freeSlots = new HashSet<WorkerSlot>(); - private final String _nodeId; - private boolean _isAlive; - - public Node(String nodeId, Set<Integer> allPorts, boolean isAlive) { - _nodeId = nodeId; - _isAlive = isAlive; - if (_isAlive && allPorts != null) { - for (int port: allPorts) { - _freeSlots.add(new WorkerSlot(_nodeId, port)); - } - } - } - - public String getId() { - return _nodeId; - } - - public boolean isAlive() { - return _isAlive; - } - - /** - * @return a collection of the topology ids currently running on this node - */ - public Collection<String> getRunningTopologies() { - return _topIdToUsedSlots.keySet(); - } - - public boolean isTotallyFree() { - return _topIdToUsedSlots.isEmpty(); - } - - public int totalSlotsFree() { - return _freeSlots.size(); - } - - public int totalSlotsUsed() { - int total = 0; - for (Set<WorkerSlot> slots: _topIdToUsedSlots.values()) { - total += slots.size(); + private static final Logger LOG = LoggerFactory.getLogger(Node.class); + private Map<String, Set<WorkerSlot>> _topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>(); + private Set<WorkerSlot> _freeSlots = new HashSet<WorkerSlot>(); + private final String _nodeId; + private boolean _isAlive; + + public Node(String nodeId, Set<Integer> allPorts, boolean isAlive) { + _nodeId = nodeId; + _isAlive = isAlive; + if (_isAlive && allPorts != null) { + for (int port : allPorts) { + _freeSlots.add(new WorkerSlot(_nodeId, port)); + } + } } - return total; - } - - public int totalSlots() { - return totalSlotsFree() + totalSlotsUsed(); - } - - public int totalSlotsUsed(String topId) { - int total = 0; - Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId); - if (slots != null) { - total = slots.size(); + + public String getId() { + return _nodeId; } - return total; - } - - private void validateSlot(WorkerSlot ws) { - if (!_nodeId.equals(ws.getNodeId())) { - throw new IllegalArgumentException( - "Trying to add a slot to the wrong node " + ws + - " is not a part of " + _nodeId); + + public boolean isAlive() { + return _isAlive; } - } - - private void addOrphanedSlot(WorkerSlot ws) { - if (_isAlive) { - throw new IllegalArgumentException("Orphaned Slots " + - "only are allowed on dead nodes."); + + /** + * @return a collection of the topology ids currently running on this node + */ + public Collection<String> getRunningTopologies() { + return _topIdToUsedSlots.keySet(); } - validateSlot(ws); - if (_freeSlots.contains(ws)) { - return; + + public boolean isTotallyFree() { + return _topIdToUsedSlots.isEmpty(); } - for (Set<WorkerSlot> used: _topIdToUsedSlots.values()) { - if (used.contains(ws)) { - return; - } + + public int totalSlotsFree() { + return _freeSlots.size(); } - _freeSlots.add(ws); - } - - boolean assignInternal(WorkerSlot ws, String topId, boolean dontThrow) { - validateSlot(ws); - if (!_freeSlots.remove(ws)) { - for (Entry<String, Set<WorkerSlot>> topologySetEntry : _topIdToUsedSlots.entrySet()) { - if (topologySetEntry.getValue().contains(ws)) { - if (dontThrow) { - LOG.warn("Worker slot [" + ws + "] can't be assigned to " + topId + - ". Its already assigned to " + topologySetEntry.getKey() + "."); - return true; - } - throw new IllegalStateException("Worker slot [" + ws + "] can't be assigned to " - + topId + ". Its already assigned to " + topologySetEntry.getKey() + "."); + + public int totalSlotsUsed() { + int total = 0; + for (Set<WorkerSlot> slots : _topIdToUsedSlots.values()) { + total += slots.size(); } - } - LOG.warn("Adding Worker slot [" + ws + "] that was not reported in the supervisor heartbeats," + - " but the worker is already running for topology " + topId + "."); - } - Set<WorkerSlot> usedSlots = _topIdToUsedSlots.get(topId); - if (usedSlots == null) { - usedSlots = new HashSet<WorkerSlot>(); - _topIdToUsedSlots.put(topId, usedSlots); + return total; } - usedSlots.add(ws); - return false; - } - - /** - * Free all slots on this node. This will update the Cluster too. - * @param cluster the cluster to be updated - */ - public void freeAllSlots(Cluster cluster) { - if (!_isAlive) { - LOG.warn("Freeing all slots on a dead node {} ",_nodeId); - } - for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) { - cluster.freeSlots(entry.getValue()); - if (_isAlive) { - _freeSlots.addAll(entry.getValue()); - } + + public int totalSlots() { + return totalSlotsFree() + totalSlotsUsed(); } - _topIdToUsedSlots = new HashMap<String,Set<WorkerSlot>>(); - } - - /** - * Frees a single slot in this node - * @param ws the slot to free - * @param cluster the cluster to update - */ - public void free(WorkerSlot ws, Cluster cluster, boolean forceFree) { - if (_freeSlots.contains(ws)) return; - boolean wasFound = false; - for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) { - Set<WorkerSlot> slots = entry.getValue(); - if (slots.remove(ws)) { - cluster.freeSlot(ws); - if (_isAlive) { - _freeSlots.add(ws); + + public int totalSlotsUsed(String topId) { + int total = 0; + Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId); + if (slots != null) { + total = slots.size(); } - wasFound = true; - } + return total; } - if(!wasFound) - { - if(forceFree) - { - LOG.info("Forcefully freeing the " + ws); - cluster.freeSlot(ws); - _freeSlots.add(ws); - } else { - throw new IllegalArgumentException("Tried to free a slot that was not" + - " part of this node " + _nodeId); - } + + private void validateSlot(WorkerSlot ws) { + if (!_nodeId.equals(ws.getNodeId())) { + throw new IllegalArgumentException("Trying to add a slot to the wrong node " + ws + " is not a part of " + _nodeId); + } } - } - - /** - * Frees all the slots for a topology. - * @param topId the topology to free slots for - * @param cluster the cluster to update - */ - public void freeTopology(String topId, Cluster cluster) { - Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId); - if (slots == null || slots.isEmpty()) return; - for (WorkerSlot ws : slots) { - cluster.freeSlot(ws); - if (_isAlive) { + + private void addOrphanedSlot(WorkerSlot ws) { + if (_isAlive) { + throw new IllegalArgumentException("Orphaned Slots " + "only are allowed on dead nodes."); + } + validateSlot(ws); + if (_freeSlots.contains(ws)) { + return; + } + for (Set<WorkerSlot> used : _topIdToUsedSlots.values()) { + if (used.contains(ws)) { + return; + } + } _freeSlots.add(ws); - } } - _topIdToUsedSlots.remove(topId); - } - - /** - * Assign a free slot on the node to the following topology and executors. - * This will update the cluster too. - * @param topId the topology to assign a free slot to. - * @param executors the executors to run in that slot. - * @param cluster the cluster to be updated - */ - public void assign(String topId, Collection<ExecutorDetails> executors, - Cluster cluster) { - if (!_isAlive) { - throw new IllegalStateException("Trying to adding to a dead node " + _nodeId); + + boolean assignInternal(WorkerSlot ws, String topId, boolean dontThrow) { + validateSlot(ws); + if (!_freeSlots.remove(ws)) { + for (Entry<String, Set<WorkerSlot>> topologySetEntry : _topIdToUsedSlots.entrySet()) { + if (topologySetEntry.getValue().contains(ws)) { + if (dontThrow) { + LOG.warn("Worker slot [" + ws + "] can't be assigned to " + topId + ". Its already assigned to " + topologySetEntry.getKey() + "."); + return true; + } + throw new IllegalStateException("Worker slot [" + ws + "] can't be assigned to " + topId + ". Its already assigned to " + + topologySetEntry.getKey() + "."); + } + } + LOG.warn("Adding Worker slot [" + ws + "] that was not reported in the supervisor heartbeats," + " but the worker is already running for topology " + + topId + "."); + } + Set<WorkerSlot> usedSlots = _topIdToUsedSlots.get(topId); + if (usedSlots == null) { + usedSlots = new HashSet<WorkerSlot>(); + _topIdToUsedSlots.put(topId, usedSlots); + } + usedSlots.add(ws); + return false; } - if (_freeSlots.isEmpty()) { - throw new IllegalStateException("Trying to assign to a full node " + _nodeId); + + /** + * Free all slots on this node. This will update the Cluster too. + * + * @param cluster the cluster to be updated + */ + public void freeAllSlots(Cluster cluster) { + if (!_isAlive) { + LOG.warn("Freeing all slots on a dead node {} ", _nodeId); + } + for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) { + cluster.freeSlots(entry.getValue()); + if (_isAlive) { + _freeSlots.addAll(entry.getValue()); + } + } + _topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>(); } - if (executors.size() == 0) { - LOG.warn("Trying to assign nothing from " + topId + " to " + _nodeId + " (Ignored)"); - } else { - WorkerSlot slot = _freeSlots.iterator().next(); - cluster.assign(slot, topId, executors); - assignInternal(slot, topId, false); + + /** + * Frees a single slot in this node + * + * @param ws the slot to free + * @param cluster the cluster to update + */ + public void free(WorkerSlot ws, Cluster cluster, boolean forceFree) { + if (_freeSlots.contains(ws)) + return; + boolean wasFound = false; + for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) { + Set<WorkerSlot> slots = entry.getValue(); + if (slots.remove(ws)) { + cluster.freeSlot(ws); + if (_isAlive) { + _freeSlots.add(ws); + } + wasFound = true; + } + } + if (!wasFound) { + if (forceFree) { + LOG.info("Forcefully freeing the " + ws); + cluster.freeSlot(ws); + _freeSlots.add(ws); + } else { + throw new IllegalArgumentException("Tried to free a slot that was not" + " part of this node " + _nodeId); + } + } } - } - - @Override - public boolean equals(Object other) { - if (other instanceof Node) { - return _nodeId.equals(((Node)other)._nodeId); + + /** + * Frees all the slots for a topology. + * + * @param topId the topology to free slots for + * @param cluster the cluster to update + */ + public void freeTopology(String topId, Cluster cluster) { + Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId); + if (slots == null || slots.isEmpty()) + return; + for (WorkerSlot ws : slots) { + cluster.freeSlot(ws); + if (_isAlive) { + _freeSlots.add(ws); + } + } + _topIdToUsedSlots.remove(topId); } - return false; - } - - @Override - public int hashCode() { - return _nodeId.hashCode(); - } - - @Override - public String toString() { - return "Node: " + _nodeId; - } - - public static int countSlotsUsed(String topId, Collection<Node> nodes) { - int total = 0; - for (Node n: nodes) { - total += n.totalSlotsUsed(topId); + + /** + * Assign a free slot on the node to the following topology and executors. This will update the cluster too. + * + * @param topId the topology to assign a free slot to. + * @param executors the executors to run in that slot. + * @param cluster the cluster to be updated + */ + public void assign(String topId, Collection<ExecutorDetails> executors, Cluster cluster) { + if (!_isAlive) { + throw new IllegalStateException("Trying to adding to a dead node " + _nodeId); + } + if (_freeSlots.isEmpty()) { + throw new IllegalStateException("Trying to assign to a full node " + _nodeId); + } + if (executors.size() == 0) { + LOG.warn("Trying to assign nothing from " + topId + " to " + _nodeId + " (Ignored)"); + } else { + WorkerSlot slot = _freeSlots.iterator().next(); + cluster.assign(slot, topId, executors); + assignInternal(slot, topId, false); + } } - return total; - } - - public static int countSlotsUsed(Collection<Node> nodes) { - int total = 0; - for (Node n: nodes) { - total += n.totalSlotsUsed(); + + @Override + public boolean equals(Object other) { + if (other instanceof Node) { + return _nodeId.equals(((Node) other)._nodeId); + } + return false; } - return total; - } - - public static int countFreeSlotsAlive(Collection<Node> nodes) { - int total = 0; - for (Node n: nodes) { - if (n.isAlive()) { - total += n.totalSlotsFree(); - } + + @Override + public int hashCode() { + return _nodeId.hashCode(); } - return total; - } - - public static int countTotalSlotsAlive(Collection<Node> nodes) { - int total = 0; - for (Node n: nodes) { - if (n.isAlive()) { - total += n.totalSlots(); - } + + @Override + public String toString() { + return "Node: " + _nodeId; } - return total; - } - - public static Map<String, Node> getAllNodesFrom(Cluster cluster) { - Map<String, Node> nodeIdToNode = new HashMap<String, Node>(); - for (SupervisorDetails sup : cluster.getSupervisors().values()) { - //Node ID and supervisor ID are the same. - String id = sup.getId(); - boolean isAlive = !cluster.isBlackListed(id); - LOG.debug("Found a {} Node {} {}", - new Object[] {isAlive? "living":"dead", id, sup.getAllPorts()}); - nodeIdToNode.put(id, new Node(id, sup.getAllPorts(), isAlive)); + + public static int countSlotsUsed(String topId, Collection<Node> nodes) { + int total = 0; + for (Node n : nodes) { + total += n.totalSlotsUsed(topId); + } + return total; } - - for (Entry<String, SchedulerAssignment> entry : cluster.getAssignments().entrySet()) { - String topId = entry.getValue().getTopologyId(); - for (WorkerSlot ws: entry.getValue().getSlots()) { - String id = ws.getNodeId(); - Node node = nodeIdToNode.get(id); - if (node == null) { - LOG.debug("Found an assigned slot on a dead supervisor {}", ws); - node = new Node(id, null, false); - nodeIdToNode.put(id, node); + + public static int countSlotsUsed(Collection<Node> nodes) { + int total = 0; + for (Node n : nodes) { + total += n.totalSlotsUsed(); } - if (!node.isAlive()) { - //The supervisor on the node down so add an orphaned slot to hold the unsupervised worker - node.addOrphanedSlot(ws); + return total; + } + + public static int countFreeSlotsAlive(Collection<Node> nodes) { + int total = 0; + for (Node n : nodes) { + if (n.isAlive()) { + total += n.totalSlotsFree(); + } } - if (node.assignInternal(ws, topId, true)) { - LOG.warn("Bad scheduling state for topology [" + topId+ "], the slot " + - ws + " assigned to multiple workers, un-assigning everything..."); - node.free(ws, cluster, true); + return total; + } + + public static int countTotalSlotsAlive(Collection<Node> nodes) { + int total = 0; + for (Node n : nodes) { + if (n.isAlive()) { + total += n.totalSlots(); + } } - } + return total; } - - return nodeIdToNode; - } - - /** - * Used to sort a list of nodes so the node with the most free slots comes - * first. - */ - public static final Comparator<Node> FREE_NODE_COMPARATOR_DEC = new Comparator<Node>() { - @Override - public int compare(Node o1, Node o2) { - return o2.totalSlotsFree() - o1.totalSlotsFree(); + + public static Map<String, Node> getAllNodesFrom(Cluster cluster) { + Map<String, Node> nodeIdToNode = new HashMap<String, Node>(); + for (SupervisorDetails sup : cluster.getSupervisors().values()) { + // Node ID and supervisor ID are the same. + String id = sup.getId(); + boolean isAlive = !cluster.isBlackListed(id); + LOG.debug("Found a {} Node {} {}", new Object[] { isAlive ? "living" : "dead", id, sup.getAllPorts() }); + nodeIdToNode.put(id, new Node(id, sup.getAllPorts(), isAlive)); + } + + for (Entry<String, SchedulerAssignment> entry : cluster.getAssignments().entrySet()) { + String topId = entry.getValue().getTopologyId(); + for (WorkerSlot ws : entry.getValue().getSlots()) { + String id = ws.getNodeId(); + Node node = nodeIdToNode.get(id); + if (node == null) { + LOG.debug("Found an assigned slot on a dead supervisor {}", ws); + node = new Node(id, null, false); + nodeIdToNode.put(id, node); + } + if (!node.isAlive()) { + // The supervisor on the node down so add an orphaned slot to hold the unsupervised worker + node.addOrphanedSlot(ws); + } + if (node.assignInternal(ws, topId, true)) { + LOG.warn("Bad scheduling state for topology [" + topId + "], the slot " + ws + " assigned to multiple workers, un-assigning everything..."); + node.free(ws, cluster, true); + } + } + } + + return nodeIdToNode; } - }; + + /** + * Used to sort a list of nodes so the node with the most free slots comes first. + */ + public static final Comparator<Node> FREE_NODE_COMPARATOR_DEC = new Comparator<Node>() { + @Override + public int compare(Node o1, Node o2) { + return o2.totalSlotsFree() - o1.totalSlotsFree(); + } + }; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/NodePool.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/NodePool.java b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/NodePool.java index 21d1577..9537fa8 100755 --- a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/NodePool.java +++ b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/NodePool.java @@ -42,255 +42,259 @@ import backtype.storm.scheduler.WorkerSlot; * A pool of nodes that can be used to run topologies. */ public abstract class NodePool { - protected Cluster _cluster; - protected Map<String, Node> _nodeIdToNode; - - public static class NodeAndSlotCounts { - public final int _nodes; - public final int _slots; - - public NodeAndSlotCounts(int nodes, int slots) { - _nodes = nodes; - _slots = slots; + protected Cluster _cluster; + protected Map<String, Node> _nodeIdToNode; + + public static class NodeAndSlotCounts { + public final int _nodes; + public final int _slots; + + public NodeAndSlotCounts(int nodes, int slots) { + _nodes = nodes; + _slots = slots; + } } - } - /** - * Place executors into slots in a round robin way, taking into account - * component spreading among different hosts. - */ - public static class RoundRobinSlotScheduler { - private Map<String,Set<String>> _nodeToComps; - private HashMap<String, List<ExecutorDetails>> _spreadToSchedule; - private LinkedList<Set<ExecutorDetails>> _slots; - private Set<ExecutorDetails> _lastSlot; - private Cluster _cluster; - private String _topId; - /** - * Create a new scheduler for a given topology - * @param td the topology to schedule - * @param slotsToUse the number of slots to use for the executors left to - * schedule. - * @param cluster the cluster to schedule this on. + * Place executors into slots in a round robin way, taking into account component spreading among different hosts. */ - public RoundRobinSlotScheduler(TopologyDetails td, int slotsToUse, - Cluster cluster) { - _topId = td.getId(); - _cluster = cluster; - - Map<ExecutorDetails, String> execToComp = td.getExecutorToComponent(); - SchedulerAssignment assignment = _cluster.getAssignmentById(_topId); - _nodeToComps = new HashMap<String, Set<String>>(); + public static class RoundRobinSlotScheduler { + private Map<String, Set<String>> _nodeToComps; + private HashMap<String, List<ExecutorDetails>> _spreadToSchedule; + private LinkedList<Set<ExecutorDetails>> _slots; + private Set<ExecutorDetails> _lastSlot; + private Cluster _cluster; + private String _topId; - if (assignment != null) { - Map<ExecutorDetails, WorkerSlot> execToSlot = assignment.getExecutorToSlot(); - - for (Entry<ExecutorDetails, WorkerSlot> entry: execToSlot.entrySet()) { - String nodeId = entry.getValue().getNodeId(); - Set<String> comps = _nodeToComps.get(nodeId); - if (comps == null) { - comps = new HashSet<String>(); - _nodeToComps.put(nodeId, comps); - } - comps.add(execToComp.get(entry.getKey())); - } - } - - _spreadToSchedule = new HashMap<String, List<ExecutorDetails>>(); - List<String> spreadComps = (List<String>)td.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS); - if (spreadComps != null) { - for (String comp: spreadComps) { - _spreadToSchedule.put(comp, new ArrayList<ExecutorDetails>()); + /** + * Create a new scheduler for a given topology + * + * @param td the topology to schedule + * @param slotsToUse the number of slots to use for the executors left to schedule. + * @param cluster the cluster to schedule this on. + */ + public RoundRobinSlotScheduler(TopologyDetails td, int slotsToUse, Cluster cluster) { + _topId = td.getId(); + _cluster = cluster; + + Map<ExecutorDetails, String> execToComp = td.getExecutorToComponent(); + SchedulerAssignment assignment = _cluster.getAssignmentById(_topId); + _nodeToComps = new HashMap<String, Set<String>>(); + + if (assignment != null) { + Map<ExecutorDetails, WorkerSlot> execToSlot = assignment.getExecutorToSlot(); + + for (Entry<ExecutorDetails, WorkerSlot> entry : execToSlot.entrySet()) { + String nodeId = entry.getValue().getNodeId(); + Set<String> comps = _nodeToComps.get(nodeId); + if (comps == null) { + comps = new HashSet<String>(); + _nodeToComps.put(nodeId, comps); + } + comps.add(execToComp.get(entry.getKey())); + } + } + + _spreadToSchedule = new HashMap<String, List<ExecutorDetails>>(); + List<String> spreadComps = (List<String>) td.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS); + if (spreadComps != null) { + for (String comp : spreadComps) { + _spreadToSchedule.put(comp, new ArrayList<ExecutorDetails>()); + } + } + + _slots = new LinkedList<Set<ExecutorDetails>>(); + for (int i = 0; i < slotsToUse; i++) { + _slots.add(new HashSet<ExecutorDetails>()); + } + + int at = 0; + for (Entry<String, List<ExecutorDetails>> entry : _cluster.getNeedsSchedulingComponentToExecutors(td).entrySet()) { + LOG.debug("Scheduling for {}", entry.getKey()); + if (_spreadToSchedule.containsKey(entry.getKey())) { + LOG.debug("Saving {} for spread...", entry.getKey()); + _spreadToSchedule.get(entry.getKey()).addAll(entry.getValue()); + } else { + for (ExecutorDetails ed : entry.getValue()) { + LOG.debug("Assigning {} {} to slot {}", new Object[] { entry.getKey(), ed, at }); + _slots.get(at).add(ed); + at++; + if (at >= _slots.size()) { + at = 0; + } + } + } + } + _lastSlot = _slots.get(_slots.size() - 1); } - } - - _slots = new LinkedList<Set<ExecutorDetails>>(); - for (int i = 0; i < slotsToUse; i++) { - _slots.add(new HashSet<ExecutorDetails>()); - } - int at = 0; - for (Entry<String, List<ExecutorDetails>> entry: _cluster.getNeedsSchedulingComponentToExecutors(td).entrySet()) { - LOG.debug("Scheduling for {}", entry.getKey()); - if (_spreadToSchedule.containsKey(entry.getKey())) { - LOG.debug("Saving {} for spread...",entry.getKey()); - _spreadToSchedule.get(entry.getKey()).addAll(entry.getValue()); - } else { - for (ExecutorDetails ed: entry.getValue()) { - LOG.debug("Assigning {} {} to slot {}", new Object[]{entry.getKey(), ed, at}); - _slots.get(at).add(ed); - at++; - if (at >= _slots.size()) { - at = 0; + /** + * Assign a slot to the given node. + * + * @param n the node to assign a slot to. + * @return true if there are more slots to assign else false. + */ + public boolean assignSlotTo(Node n) { + if (_slots.isEmpty()) { + return false; } - } + Set<ExecutorDetails> slot = _slots.pop(); + if (slot == _lastSlot) { + // The last slot fill it up + for (Entry<String, List<ExecutorDetails>> entry : _spreadToSchedule.entrySet()) { + if (entry.getValue().size() > 0) { + slot.addAll(entry.getValue()); + } + } + } else { + String nodeId = n.getId(); + Set<String> nodeComps = _nodeToComps.get(nodeId); + if (nodeComps == null) { + nodeComps = new HashSet<String>(); + _nodeToComps.put(nodeId, nodeComps); + } + for (Entry<String, List<ExecutorDetails>> entry : _spreadToSchedule.entrySet()) { + if (entry.getValue().size() > 0) { + String comp = entry.getKey(); + if (!nodeComps.contains(comp)) { + nodeComps.add(comp); + slot.add(entry.getValue().remove(0)); + } + } + } + } + n.assign(_topId, slot, _cluster); + return !_slots.isEmpty(); } - } - _lastSlot = _slots.get(_slots.size() - 1); } - + + private static final Logger LOG = LoggerFactory.getLogger(NodePool.class); + + /** + * Initialize the pool. + * + * @param cluster the cluster + * @param nodeIdToNode the mapping of node id to nodes + */ + public void init(Cluster cluster, Map<String, Node> nodeIdToNode) { + _cluster = cluster; + _nodeIdToNode = nodeIdToNode; + } + /** - * Assign a slot to the given node. - * @param n the node to assign a slot to. - * @return true if there are more slots to assign else false. + * Add a topology to the pool + * + * @param td the topology to add. */ - public boolean assignSlotTo(Node n) { - if (_slots.isEmpty()) { - return false; - } - Set<ExecutorDetails> slot = _slots.pop(); - if (slot == _lastSlot) { - //The last slot fill it up - for (Entry<String, List<ExecutorDetails>> entry: _spreadToSchedule.entrySet()) { - if (entry.getValue().size() > 0) { - slot.addAll(entry.getValue()); - } + public abstract void addTopology(TopologyDetails td); + + /** + * Check if this topology can be added to this pool + * + * @param td the topology + * @return true if it can else false + */ + public abstract boolean canAdd(TopologyDetails td); + + /** + * @return the number of nodes that are available to be taken + */ + public abstract int slotsAvailable(); + + /** + * Take nodes from this pool that can fulfill possibly up to the slotsNeeded + * + * @param slotsNeeded the number of slots that are needed. + * @return a Collection of nodes with the removed nodes in it. This may be empty, but should not be null. + */ + public abstract Collection<Node> takeNodesBySlots(int slotsNeeded); + + /** + * Get the number of nodes and slots this would provide to get the slots needed + * + * @param slots the number of slots needed + * @return the number of nodes and slots that would be returned. + */ + public abstract NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slots); + + /** + * @return the number of nodes that are available to be taken + */ + public abstract int nodesAvailable(); + + /** + * Take up to nodesNeeded from this pool + * + * @param nodesNeeded the number of nodes that are needed. + * @return a Collection of nodes with the removed nodes in it. This may be empty, but should not be null. + */ + public abstract Collection<Node> takeNodes(int nodesNeeded); + + /** + * Reschedule any topologies as needed. + * + * @param lesserPools pools that may be used to steal nodes from. + */ + public abstract void scheduleAsNeeded(NodePool... lesserPools); + + public static int slotsAvailable(NodePool[] pools) { + int slotsAvailable = 0; + for (NodePool pool : pools) { + slotsAvailable += pool.slotsAvailable(); } - } else { - String nodeId = n.getId(); - Set<String> nodeComps = _nodeToComps.get(nodeId); - if (nodeComps == null) { - nodeComps = new HashSet<String>(); - _nodeToComps.put(nodeId, nodeComps); + return slotsAvailable; + } + + public static int nodesAvailable(NodePool[] pools) { + int nodesAvailable = 0; + for (NodePool pool : pools) { + nodesAvailable += pool.nodesAvailable(); } - for (Entry<String, List<ExecutorDetails>> entry: _spreadToSchedule.entrySet()) { - if (entry.getValue().size() > 0) { - String comp = entry.getKey(); - if (!nodeComps.contains(comp)) { - nodeComps.add(comp); - slot.add(entry.getValue().remove(0)); + return nodesAvailable; + } + + public static Collection<Node> takeNodesBySlot(int slotsNeeded, NodePool[] pools) { + LOG.debug("Trying to grab {} free slots from {}", slotsNeeded, pools); + HashSet<Node> ret = new HashSet<Node>(); + for (NodePool pool : pools) { + Collection<Node> got = pool.takeNodesBySlots(slotsNeeded); + ret.addAll(got); + slotsNeeded -= Node.countFreeSlotsAlive(got); + LOG.debug("Got {} nodes so far need {} more slots", ret.size(), slotsNeeded); + if (slotsNeeded <= 0) { + break; } - } } - } - n.assign(_topId, slot, _cluster); - return !_slots.isEmpty(); + return ret; } - } - - private static final Logger LOG = LoggerFactory.getLogger(NodePool.class); - /** - * Initialize the pool. - * @param cluster the cluster - * @param nodeIdToNode the mapping of node id to nodes - */ - public void init(Cluster cluster, Map<String, Node> nodeIdToNode) { - _cluster = cluster; - _nodeIdToNode = nodeIdToNode; - } - - /** - * Add a topology to the pool - * @param td the topology to add. - */ - public abstract void addTopology(TopologyDetails td); - - /** - * Check if this topology can be added to this pool - * @param td the topology - * @return true if it can else false - */ - public abstract boolean canAdd(TopologyDetails td); - - /** - * @return the number of nodes that are available to be taken - */ - public abstract int slotsAvailable(); - - /** - * Take nodes from this pool that can fulfill possibly up to the - * slotsNeeded - * @param slotsNeeded the number of slots that are needed. - * @return a Collection of nodes with the removed nodes in it. - * This may be empty, but should not be null. - */ - public abstract Collection<Node> takeNodesBySlots(int slotsNeeded); - /** - * Get the number of nodes and slots this would provide to get the slots needed - * @param slots the number of slots needed - * @return the number of nodes and slots that would be returned. - */ - public abstract NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slots); - - /** - * @return the number of nodes that are available to be taken - */ - public abstract int nodesAvailable(); - - /** - * Take up to nodesNeeded from this pool - * @param nodesNeeded the number of nodes that are needed. - * @return a Collection of nodes with the removed nodes in it. - * This may be empty, but should not be null. - */ - public abstract Collection<Node> takeNodes(int nodesNeeded); - - /** - * Reschedule any topologies as needed. - * @param lesserPools pools that may be used to steal nodes from. - */ - public abstract void scheduleAsNeeded(NodePool ... lesserPools); - - public static int slotsAvailable(NodePool[] pools) { - int slotsAvailable = 0; - for (NodePool pool: pools) { - slotsAvailable += pool.slotsAvailable(); - } - return slotsAvailable; - } - - public static int nodesAvailable(NodePool[] pools) { - int nodesAvailable = 0; - for (NodePool pool: pools) { - nodesAvailable += pool.nodesAvailable(); - } - return nodesAvailable; - } - - public static Collection<Node> takeNodesBySlot(int slotsNeeded,NodePool[] pools) { - LOG.debug("Trying to grab {} free slots from {}",slotsNeeded, pools); - HashSet<Node> ret = new HashSet<Node>(); - for (NodePool pool: pools) { - Collection<Node> got = pool.takeNodesBySlots(slotsNeeded); - ret.addAll(got); - slotsNeeded -= Node.countFreeSlotsAlive(got); - LOG.debug("Got {} nodes so far need {} more slots",ret.size(),slotsNeeded); - if (slotsNeeded <= 0) { - break; - } - } - return ret; - } - - public static Collection<Node> takeNodes(int nodesNeeded,NodePool[] pools) { - LOG.debug("Trying to grab {} free nodes from {}",nodesNeeded, pools); - HashSet<Node> ret = new HashSet<Node>(); - for (NodePool pool: pools) { - Collection<Node> got = pool.takeNodes(nodesNeeded); - ret.addAll(got); - nodesNeeded -= got.size(); - LOG.debug("Got {} nodes so far need {} more nodes", ret.size(), nodesNeeded); - if (nodesNeeded <= 0) { - break; - } + public static Collection<Node> takeNodes(int nodesNeeded, NodePool[] pools) { + LOG.debug("Trying to grab {} free nodes from {}", nodesNeeded, pools); + HashSet<Node> ret = new HashSet<Node>(); + for (NodePool pool : pools) { + Collection<Node> got = pool.takeNodes(nodesNeeded); + ret.addAll(got); + nodesNeeded -= got.size(); + LOG.debug("Got {} nodes so far need {} more nodes", ret.size(), nodesNeeded); + if (nodesNeeded <= 0) { + break; + } + } + return ret; } - return ret; - } - public static int getNodeCountIfSlotsWereTaken(int slots,NodePool[] pools) { - LOG.debug("How many nodes to get {} slots from {}",slots, pools); - int total = 0; - for (NodePool pool: pools) { - NodeAndSlotCounts ns = pool.getNodeAndSlotCountIfSlotsWereTaken(slots); - total += ns._nodes; - slots -= ns._slots; - LOG.debug("Found {} nodes so far {} more slots needed", total, slots); - if (slots <= 0) { - break; - } - } - return total; - } + public static int getNodeCountIfSlotsWereTaken(int slots, NodePool[] pools) { + LOG.debug("How many nodes to get {} slots from {}", slots, pools); + int total = 0; + for (NodePool pool : pools) { + NodeAndSlotCounts ns = pool.getNodeAndSlotCountIfSlotsWereTaken(slots); + total += ns._nodes; + slots -= ns._slots; + LOG.debug("Found {} nodes so far {} more slots needed", total, slots); + if (slots <= 0) { + break; + } + } + return total; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/INimbusCredentialPlugin.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/INimbusCredentialPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/INimbusCredentialPlugin.java index 9670045..761eac0 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/INimbusCredentialPlugin.java +++ b/jstorm-core/src/main/java/backtype/storm/security/INimbusCredentialPlugin.java @@ -22,23 +22,23 @@ import backtype.storm.daemon.Shutdownable; import java.util.Map; /** - * Nimbus auto credential plugin that will be called on nimbus host - * during submit topology option. User can specify a list of implementation using config key + * Nimbus auto credential plugin that will be called on nimbus host during submit topology option. User can specify a list of implementation using config key * nimbus.autocredential.plugins.classes. */ public interface INimbusCredentialPlugin extends Shutdownable { /** * this method will be called when nimbus initializes. + * * @param conf */ void prepare(Map conf); /** - * Method that will be called on nimbus as part of submit topology. This plugin will be called - * at least once during the submit Topology action. It will be not be called during activate instead - * the credentials return by this method will be merged with the other credentials in the topology - * and stored in zookeeper. + * Method that will be called on nimbus as part of submit topology. This plugin will be called at least once during the submit Topology action. It will be + * not be called during activate instead the credentials return by this method will be merged with the other credentials in the topology and stored in + * zookeeper. + * * @param credentials credentials map where more credentials will be added. * @param conf topology configuration * @return http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/AuthUtils.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/AuthUtils.java b/jstorm-core/src/main/java/backtype/storm/security/auth/AuthUtils.java index ac3fb53..60653a1 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/AuthUtils.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/AuthUtils.java @@ -45,19 +45,19 @@ public class AuthUtils { /** * Construct a JAAS configuration object per storm configuration file + * * @param storm_conf Storm configuration * @return JAAS configuration object */ public static Configuration GetConfiguration(Map storm_conf) { Configuration login_conf = null; - //find login file configuration from Storm configuration - String loginConfigurationFile = (String)storm_conf.get("java.security.auth.login.config"); - if ((loginConfigurationFile != null) && (loginConfigurationFile.length()>0)) { + // find login file configuration from Storm configuration + String loginConfigurationFile = (String) storm_conf.get("java.security.auth.login.config"); + if ((loginConfigurationFile != null) && (loginConfigurationFile.length() > 0)) { File config_file = new File(loginConfigurationFile); - if (! config_file.canRead()) { - throw new RuntimeException("File " + loginConfigurationFile + - " cannot be read."); + if (!config_file.canRead()) { + throw new RuntimeException("File " + loginConfigurationFile + " cannot be read."); } try { URI config_uri = config_file.toURI(); @@ -72,24 +72,26 @@ public class AuthUtils { /** * Construct a principal to local plugin + * * @param conf storm configuration * @return the plugin */ public static IPrincipalToLocal GetPrincipalToLocalPlugin(Map storm_conf) { IPrincipalToLocal ptol = null; try { - String ptol_klassName = (String) storm_conf.get(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN); - Class klass = Class.forName(ptol_klassName); - ptol = (IPrincipalToLocal)klass.newInstance(); - ptol.prepare(storm_conf); + String ptol_klassName = (String) storm_conf.get(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN); + Class klass = Class.forName(ptol_klassName); + ptol = (IPrincipalToLocal) klass.newInstance(); + ptol.prepare(storm_conf); } catch (Exception e) { - throw new RuntimeException(e); + throw new RuntimeException(e); } return ptol; } /** * Construct a group mapping service provider plugin + * * @param conf storm configuration * @return the plugin */ @@ -98,26 +100,27 @@ public class AuthUtils { try { String gmsp_klassName = (String) storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN); Class klass = Class.forName(gmsp_klassName); - gmsp = (IGroupMappingServiceProvider)klass.newInstance(); + gmsp = (IGroupMappingServiceProvider) klass.newInstance(); gmsp.prepare(storm_conf); } catch (Exception e) { - throw new RuntimeException(e); + throw new RuntimeException(e); } return gmsp; } /** * Get all of the configured Credential Renwer Plugins. + * * @param storm_conf the storm configuration to use. * @return the configured credential renewers. */ public static Collection<ICredentialsRenewer> GetCredentialRenewers(Map conf) { try { Set<ICredentialsRenewer> ret = new HashSet<ICredentialsRenewer>(); - Collection<String> clazzes = (Collection<String>)conf.get(Config.NIMBUS_CREDENTIAL_RENEWERS); + Collection<String> clazzes = (Collection<String>) conf.get(Config.NIMBUS_CREDENTIAL_RENEWERS); if (clazzes != null) { for (String clazz : clazzes) { - ICredentialsRenewer inst = (ICredentialsRenewer)Class.forName(clazz).newInstance(); + ICredentialsRenewer inst = (ICredentialsRenewer) Class.forName(clazz).newInstance(); inst.prepare(conf); ret.add(inst); } @@ -130,16 +133,17 @@ public class AuthUtils { /** * Get all the Nimbus Auto cred plugins. + * * @param conf nimbus configuration to use. * @return nimbus auto credential plugins. */ public static Collection<INimbusCredentialPlugin> getNimbusAutoCredPlugins(Map conf) { try { Set<INimbusCredentialPlugin> ret = new HashSet<INimbusCredentialPlugin>(); - Collection<String> clazzes = (Collection<String>)conf.get(Config.NIMBUS_AUTO_CRED_PLUGINS); + Collection<String> clazzes = (Collection<String>) conf.get(Config.NIMBUS_AUTO_CRED_PLUGINS); if (clazzes != null) { for (String clazz : clazzes) { - INimbusCredentialPlugin inst = (INimbusCredentialPlugin)Class.forName(clazz).newInstance(); + INimbusCredentialPlugin inst = (INimbusCredentialPlugin) Class.forName(clazz).newInstance(); inst.prepare(conf); ret.add(inst); } @@ -152,21 +156,22 @@ public class AuthUtils { /** * Get all of the configured AutoCredential Plugins. + * * @param storm_conf the storm configuration to use. * @return the configured auto credentials. */ public static Collection<IAutoCredentials> GetAutoCredentials(Map storm_conf) { try { Set<IAutoCredentials> autos = new HashSet<IAutoCredentials>(); - Collection<String> clazzes = (Collection<String>)storm_conf.get(Config.TOPOLOGY_AUTO_CREDENTIALS); + Collection<String> clazzes = (Collection<String>) storm_conf.get(Config.TOPOLOGY_AUTO_CREDENTIALS); if (clazzes != null) { for (String clazz : clazzes) { - IAutoCredentials a = (IAutoCredentials)Class.forName(clazz).newInstance(); + IAutoCredentials a = (IAutoCredentials) Class.forName(clazz).newInstance(); a.prepare(storm_conf); autos.add(a); } } - LOG.info("Got AutoCreds "+autos); + LOG.info("Got AutoCreds " + autos); return autos; } catch (Exception e) { throw new RuntimeException(e); @@ -175,12 +180,13 @@ public class AuthUtils { /** * Populate a subject from credentials using the IAutoCredentials. + * * @param subject the subject to populate or null if a new Subject should be created. * @param autos the IAutoCredentials to call to populate the subject. * @param credentials the credentials to pull from * @return the populated subject. */ - public static Subject populateSubject(Subject subject, Collection<IAutoCredentials> autos, Map<String,String> credentials) { + public static Subject populateSubject(Subject subject, Collection<IAutoCredentials> autos, Map<String, String> credentials) { try { if (subject == null) { subject = new Subject(); @@ -196,11 +202,12 @@ public class AuthUtils { /** * Update a subject from credentials using the IAutoCredentials. + * * @param subject the subject to update * @param autos the IAutoCredentials to call to update the subject. * @param credentials the credentials to pull from */ - public static void updateSubject(Subject subject, Collection<IAutoCredentials> autos, Map<String,String> credentials) { + public static void updateSubject(Subject subject, Collection<IAutoCredentials> autos, Map<String, String> credentials) { if (subject == null) { throw new RuntimeException("The subject cannot be null when updating a subject with credentials"); } @@ -216,68 +223,68 @@ public class AuthUtils { /** * Construct a transport plugin per storm configuration + * * @param conf storm configuration * @return */ public static ITransportPlugin GetTransportPlugin(ThriftConnectionType type, Map storm_conf, Configuration login_conf) { - ITransportPlugin transportPlugin = null; + ITransportPlugin transportPlugin = null; try { String transport_plugin_klassName = type.getTransportPlugin(storm_conf); Class klass = Class.forName(transport_plugin_klassName); - transportPlugin = (ITransportPlugin)klass.newInstance(); + transportPlugin = (ITransportPlugin) klass.newInstance(); transportPlugin.prepare(type, storm_conf, login_conf); - } catch(Exception e) { + } catch (Exception e) { throw new RuntimeException(e); } return transportPlugin; } - private static IHttpCredentialsPlugin GetHttpCredentialsPlugin(Map conf, - String klassName) { + private static IHttpCredentialsPlugin GetHttpCredentialsPlugin(Map conf, String klassName) { IHttpCredentialsPlugin plugin = null; try { Class klass = Class.forName(klassName); - plugin = (IHttpCredentialsPlugin)klass.newInstance(); + plugin = (IHttpCredentialsPlugin) klass.newInstance(); plugin.prepare(conf); - } catch(Exception e) { + } catch (Exception e) { throw new RuntimeException(e); } return plugin; } /** - * Construct an HttpServletRequest credential plugin specified by the UI - * storm configuration + * Construct an HttpServletRequest credential plugin specified by the UI storm configuration + * * @param conf storm configuration * @return the plugin */ public static IHttpCredentialsPlugin GetUiHttpCredentialsPlugin(Map conf) { - String klassName = (String)conf.get(Config.UI_HTTP_CREDS_PLUGIN); + String klassName = (String) conf.get(Config.UI_HTTP_CREDS_PLUGIN); return AuthUtils.GetHttpCredentialsPlugin(conf, klassName); } /** - * Construct an HttpServletRequest credential plugin specified by the DRPC - * storm configuration + * Construct an HttpServletRequest credential plugin specified by the DRPC storm configuration + * * @param conf storm configuration * @return the plugin */ public static IHttpCredentialsPlugin GetDrpcHttpCredentialsPlugin(Map conf) { - String klassName = (String)conf.get(Config.DRPC_HTTP_CREDS_PLUGIN); + String klassName = (String) conf.get(Config.DRPC_HTTP_CREDS_PLUGIN); return AuthUtils.GetHttpCredentialsPlugin(conf, klassName); } public static String get(Configuration configuration, String section, String key) throws IOException { AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(section); if (configurationEntries == null) { - String errorMessage = "Could not find a '"+ section + "' entry in this configuration."; + String errorMessage = "Could not find a '" + section + "' entry in this configuration."; throw new IOException(errorMessage); } - for(AppConfigurationEntry entry: configurationEntries) { + for (AppConfigurationEntry entry : configurationEntries) { Object val = entry.getOptions().get(key); if (val != null) - return (String)val; + return (String) val; } return null; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java index e2469e5..6386992 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java @@ -31,11 +31,11 @@ import org.slf4j.LoggerFactory; import backtype.storm.security.auth.ReqContext; public class DefaultHttpCredentialsPlugin implements IHttpCredentialsPlugin { - private static final Logger LOG = - LoggerFactory.getLogger(DefaultHttpCredentialsPlugin.class); + private static final Logger LOG = LoggerFactory.getLogger(DefaultHttpCredentialsPlugin.class); /** * No-op + * * @param storm_conf Storm configuration */ @Override @@ -45,6 +45,7 @@ public class DefaultHttpCredentialsPlugin implements IHttpCredentialsPlugin { /** * Gets the user name from the request principal. + * * @param req the servlet request * @return the authenticated user, or null if none is authenticated */ @@ -54,7 +55,7 @@ public class DefaultHttpCredentialsPlugin implements IHttpCredentialsPlugin { if (req != null && (princ = req.getUserPrincipal()) != null) { String userName = princ.getName(); if (userName != null && !userName.isEmpty()) { - LOG.debug("HTTP request had user ("+userName+")"); + LOG.debug("HTTP request had user (" + userName + ")"); return userName; } } @@ -62,29 +63,28 @@ public class DefaultHttpCredentialsPlugin implements IHttpCredentialsPlugin { } /** - * Populates a given context with a new Subject derived from the - * credentials in a servlet request. + * Populates a given context with a new Subject derived from the credentials in a servlet request. + * * @param context the context to be populated * @param req the servlet request * @return the context */ @Override - public ReqContext populateContext(ReqContext context, - HttpServletRequest req) { + public ReqContext populateContext(ReqContext context, HttpServletRequest req) { String userName = getUserName(req); String doAsUser = req.getHeader("doAsUser"); - if(doAsUser == null) { + if (doAsUser == null) { doAsUser = req.getParameter("doAsUser"); } - if(doAsUser != null) { + if (doAsUser != null) { context.setRealPrincipal(new SingleUserPrincipal(userName)); userName = doAsUser; } Set<Principal> principals = new HashSet<Principal>(); - if(userName != null) { + if (userName != null) { Principal p = new SingleUserPrincipal(userName); principals.add(p); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultPrincipalToLocal.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultPrincipalToLocal.java b/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultPrincipalToLocal.java index 729d744..47e23b0 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultPrincipalToLocal.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultPrincipalToLocal.java @@ -22,22 +22,24 @@ import java.util.Map; import java.security.Principal; /** - * Storm can be configured to launch worker processed as a given user. - * Some transports need to map the Principal to a local user name. + * Storm can be configured to launch worker processed as a given user. Some transports need to map the Principal to a local user name. */ public class DefaultPrincipalToLocal implements IPrincipalToLocal { /** * Invoked once immediately after construction - * @param conf Storm configuration + * + * @param conf Storm configuration */ - public void prepare(Map storm_conf) {} - + public void prepare(Map storm_conf) { + } + /** * Convert a Principal to a local user name. + * * @param principal the principal to convert * @return The local user name. */ public String toLocal(Principal principal) { - return principal == null ? null : principal.getName(); + return principal == null ? null : principal.getName(); } }
