http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java index c401474..d65c125 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java @@ -1,33 +1,25 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.blacklist; import com.google.common.collect.EvictingQueue; import com.google.common.collect.Sets; - import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; - import org.apache.storm.DaemonConfig; import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.scheduler.Cluster; @@ -46,29 +38,23 @@ import org.slf4j.LoggerFactory; public class BlacklistScheduler implements IScheduler { - private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class); - public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800; public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3; public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME = 300; - + private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class); private final IScheduler underlyingScheduler; - private Map<String, Object> conf; - protected int toleranceTime; protected int toleranceCount; protected int resumeTime; protected IReporter reporter; protected IBlacklistStrategy blacklistStrategy; - protected int nimbusMonitorFreqSecs; - protected Map<String, Set<Integer>> cachedSupervisors; - //key is supervisor key ,value is supervisor ports protected EvictingQueue<HashMap<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow; protected int windowSize; protected Set<String> blacklistHost; + private Map<String, Object> conf; public BlacklistScheduler(IScheduler underlyingScheduler) { this.underlyingScheduler = underlyingScheduler; @@ -81,18 +67,18 @@ public class BlacklistScheduler implements IScheduler { this.conf = conf; toleranceTime = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME), - DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME); + DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME); toleranceCount = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), - DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT); + DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT); resumeTime = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), - DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME); + DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME); String reporterClassName = ObjectReader.getString(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER), - LogReporter.class.getName()); + LogReporter.class.getName()); reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter"); String strategyClassName = ObjectReader.getString(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY), - DefaultBlacklistStrategy.class.getName()); + DefaultBlacklistStrategy.class.getName()); blacklistStrategy = (IBlacklistStrategy) initializeInstance(strategyClassName, "blacklist strategy"); nimbusMonitorFreqSecs = ObjectReader.getInt(this.conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS)); @@ -177,7 +163,7 @@ public class BlacklistScheduler implements IScheduler { private Set<String> getBlacklistHosts(Cluster cluster, Topologies topologies) { Set<String> blacklistSet = blacklistStrategy.getBlacklist(new ArrayList<>(badSupervisorsToleranceSlidingWindow), - cluster, topologies); + cluster, topologies); Set<String> blacklistHostSet = new HashSet<>(); for (String supervisor : blacklistSet) { String host = cluster.getHost(supervisor); @@ -235,7 +221,7 @@ public class BlacklistScheduler implements IScheduler { cachedSupervisors.put(supervisorKey, slots); } LOG.info("Worker slot {} was never back to normal during tolerance period, probably dead. Will be removed from cache.", - workerSlot); + workerSlot); } } }
http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java index 153829c..b1bb933 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.blacklist.reporters; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java index 3255c9d..4d3eded 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.blacklist.reporters; @@ -21,7 +15,6 @@ package org.apache.storm.scheduler.blacklist.reporters; import java.util.List; import java.util.Map; import java.util.Set; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +29,6 @@ public class LogReporter implements IReporter { @Override public void reportBlacklist(String supervisor, List<Map<String, Set<Integer>>> toleranceBuffer) { LOG.warn("add supervisor {} to blacklist. The bad slot history of supervisors is : {}", - supervisor, toleranceBuffer); + supervisor, toleranceBuffer); } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java index f6ca6ad..3332b2d 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.blacklist.strategies; @@ -24,7 +18,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; - import org.apache.storm.DaemonConfig; import org.apache.storm.scheduler.Cluster; import org.apache.storm.scheduler.SupervisorDetails; @@ -39,11 +32,9 @@ import org.slf4j.LoggerFactory; public class DefaultBlacklistStrategy implements IBlacklistStrategy { - private static Logger LOG = LoggerFactory.getLogger(DefaultBlacklistStrategy.class); - public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800; public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3; - + private static Logger LOG = LoggerFactory.getLogger(DefaultBlacklistStrategy.class); private IReporter reporter; private int toleranceCount; @@ -55,11 +46,11 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy { @Override public void prepare(Map<String, Object> conf) { toleranceCount = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), - DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT); + DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT); resumeTime = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME); String reporterClassName = ObjectReader.getString(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER), - LogReporter.class.getName()); + LogReporter.class.getName()); reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter"); nimbusMonitorFreqSecs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS)); @@ -133,7 +124,7 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy { if (shortage > 0) { LOG.info("total needed num of workers :{}, available num of slots not in blacklist :{}, num blacklist :{}, " + - "will release some blacklist.", totalNeedNumWorkers, availableSlotsNotInBlacklistCount, blacklist.size()); + "will release some blacklist.", totalNeedNumWorkers, availableSlotsNotInBlacklistCount, blacklist.size()); //release earliest blacklist Set<String> readyToRemove = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java index 8543659..295dd82 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.blacklist.strategies; @@ -21,7 +15,6 @@ package org.apache.storm.scheduler.blacklist.strategies; import java.util.List; import java.util.Map; import java.util.Set; - import org.apache.storm.scheduler.Cluster; import org.apache.storm.scheduler.Topologies; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/DefaultPool.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/DefaultPool.java b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/DefaultPool.java index b690764..1b6ac91 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/DefaultPool.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/DefaultPool.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.multitenant; @@ -24,196 +18,199 @@ import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.storm.scheduler.SchedulerAssignment; import org.apache.storm.scheduler.TopologyDetails; import org.apache.storm.scheduler.WorkerSlot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A pool of machines that anyone can use, but topologies are not isolated */ public class DefaultPool extends NodePool { - private static final Logger LOG = LoggerFactory.getLogger(DefaultPool.class); - private Set<Node> _nodes = new HashSet<>(); - private HashMap<String, TopologyDetails> _tds = new HashMap<>(); - - @Override - public void addTopology(TopologyDetails td) { - String topId = td.getId(); - LOG.debug("Adding in Topology {}", topId); - _tds.put(topId, td); - SchedulerAssignment assignment = _cluster.getAssignmentById(topId); - if (assignment != null) { - for (WorkerSlot ws: assignment.getSlots()) { - Node n = _nodeIdToNode.get(ws.getNodeId()); - _nodes.add(n); - } - } - } - - @Override - public boolean canAdd(TopologyDetails td) { - return true; - } - - @Override - public Collection<Node> takeNodes(int nodesNeeded) { - HashSet<Node> ret = new HashSet<>(); - LinkedList<Node> sortedNodes = new LinkedList<>(_nodes); - Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC); - for (Node n: sortedNodes) { - if (nodesNeeded <= ret.size()) { - break; - } - if (n.isAlive()) { - n.freeAllSlots(_cluster); - _nodes.remove(n); - ret.add(n); - } - } - return ret; - } - - @Override - public int nodesAvailable() { - int total = 0; - for (Node n: _nodes) { - if (n.isAlive()) total++; - } - return total; - } - - @Override - public int slotsAvailable() { - return Node.countTotalSlotsAlive(_nodes); - } - - @Override - public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) { - int nodesFound = 0; - int slotsFound = 0; - LinkedList<Node> sortedNodes = new LinkedList<>(_nodes); - Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC); - for (Node n: sortedNodes) { - if (slotsNeeded <= 0) { - break; - } - if (n.isAlive()) { - nodesFound++; - int totalSlotsFree = n.totalSlots(); - slotsFound += totalSlotsFree; - slotsNeeded -= totalSlotsFree; - } + private static final Logger LOG = LoggerFactory.getLogger(DefaultPool.class); + private Set<Node> _nodes = new HashSet<>(); + private HashMap<String, TopologyDetails> _tds = new HashMap<>(); + + @Override + public void addTopology(TopologyDetails td) { + String topId = td.getId(); + LOG.debug("Adding in Topology {}", topId); + _tds.put(topId, td); + SchedulerAssignment assignment = _cluster.getAssignmentById(topId); + if (assignment != null) { + for (WorkerSlot ws : assignment.getSlots()) { + Node n = _nodeIdToNode.get(ws.getNodeId()); + _nodes.add(n); + } + } } - return new NodeAndSlotCounts(nodesFound, slotsFound); - } - - @Override - public Collection<Node> takeNodesBySlots(int slotsNeeded) { - HashSet<Node> ret = new HashSet<>(); - LinkedList<Node> sortedNodes = new LinkedList<>(_nodes); - Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC); - for (Node n: sortedNodes) { - if (slotsNeeded <= 0) { - break; - } - if (n.isAlive()) { - n.freeAllSlots(_cluster); - _nodes.remove(n); - ret.add(n); - slotsNeeded -= n.totalSlotsFree(); - } + + @Override + public boolean canAdd(TopologyDetails td) { + return true; } - return ret; - } - - @Override - public void scheduleAsNeeded(NodePool... lesserPools) { - for (TopologyDetails td : _tds.values()) { - String topId = td.getId(); - if (_cluster.needsScheduling(td)) { - LOG.debug("Scheduling topology {}",topId); - int totalTasks = td.getExecutors().size(); - int origRequest = td.getNumWorkers(); - int slotsRequested = Math.min(totalTasks, origRequest); - int slotsUsed = Node.countSlotsUsed(topId, _nodes); - int slotsFree = Node.countFreeSlotsAlive(_nodes); - //Check to see if we have enough slots before trying to get them - int slotsAvailable = 0; - if (slotsRequested > slotsFree) { - slotsAvailable = NodePool.slotsAvailable(lesserPools); - } - int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable); - int executorsNotRunning = _cluster.getUnassignedExecutors(td).size(); - LOG.debug("Slots... requested {} used {} free {} available {} to be used {}, executors not running {}", - slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse, executorsNotRunning); - if (slotsToUse <= 0) { - if (executorsNotRunning > 0) { - _cluster.setStatus(topId,"Not fully scheduled (No free slots in default pool) "+executorsNotRunning+" executors not scheduled"); - } else { - if (slotsUsed < slotsRequested) { - _cluster.setStatus(topId,"Running with fewer slots than requested ("+slotsUsed+"/"+origRequest+")"); - } else { //slotsUsed < origRequest - _cluster.setStatus(topId,"Fully Scheduled (requested "+origRequest+" slots, but could only use "+slotsUsed+")"); + + @Override + public Collection<Node> takeNodes(int nodesNeeded) { + HashSet<Node> ret = new HashSet<>(); + LinkedList<Node> sortedNodes = new LinkedList<>(_nodes); + Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC); + for (Node n : sortedNodes) { + if (nodesNeeded <= ret.size()) { + break; + } + if (n.isAlive()) { + n.freeAllSlots(_cluster); + _nodes.remove(n); + ret.add(n); } - } - continue; } + return ret; + } - int slotsNeeded = slotsToUse - slotsFree; - if (slotsNeeded > 0) { - _nodes.addAll(NodePool.takeNodesBySlot(slotsNeeded, lesserPools)); + @Override + public int nodesAvailable() { + int total = 0; + for (Node n : _nodes) { + if (n.isAlive()) total++; } + return total; + } + + @Override + public int slotsAvailable() { + return Node.countTotalSlotsAlive(_nodes); + } - if (executorsNotRunning <= 0) { - //There are free slots that we can take advantage of now. - for (Node n: _nodes) { - n.freeTopology(topId, _cluster); - } - slotsFree = Node.countFreeSlotsAlive(_nodes); - slotsToUse = Math.min(slotsRequested, slotsFree); + @Override + public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) { + int nodesFound = 0; + int slotsFound = 0; + LinkedList<Node> sortedNodes = new LinkedList<>(_nodes); + Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC); + for (Node n : sortedNodes) { + if (slotsNeeded <= 0) { + break; + } + if (n.isAlive()) { + nodesFound++; + int totalSlotsFree = n.totalSlots(); + slotsFound += totalSlotsFree; + slotsNeeded -= totalSlotsFree; + } } - - RoundRobinSlotScheduler slotSched = - new RoundRobinSlotScheduler(td, slotsToUse, _cluster); - - LinkedList<Node> nodes = new LinkedList<>(_nodes); - while (true) { - Node n; - do { - if (nodes.isEmpty()) { - throw new IllegalStateException("This should not happen, we" + - " messed up and did not get enough slots"); + return new NodeAndSlotCounts(nodesFound, slotsFound); + } + + @Override + public Collection<Node> takeNodesBySlots(int slotsNeeded) { + HashSet<Node> ret = new HashSet<>(); + LinkedList<Node> sortedNodes = new LinkedList<>(_nodes); + Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC); + for (Node n : sortedNodes) { + if (slotsNeeded <= 0) { + break; } - n = nodes.peekFirst(); - if (n.totalSlotsFree() == 0) { - nodes.remove(); - n = null; + if (n.isAlive()) { + n.freeAllSlots(_cluster); + _nodes.remove(n); + ret.add(n); + slotsNeeded -= n.totalSlotsFree(); } - } while (n == null); - if (!slotSched.assignSlotTo(n)) { - break; - } } - int afterSchedSlotsUsed = Node.countSlotsUsed(topId, _nodes); - if (afterSchedSlotsUsed < slotsRequested) { - _cluster.setStatus(topId,"Running with fewer slots than requested ("+afterSchedSlotsUsed+"/"+origRequest+")"); - } else if (afterSchedSlotsUsed < origRequest) { - _cluster.setStatus(topId,"Fully Scheduled (requested "+origRequest+" slots, but could only use "+afterSchedSlotsUsed+")"); - } else { - _cluster.setStatus(topId,"Fully Scheduled"); + return ret; + } + + @Override + public void scheduleAsNeeded(NodePool... lesserPools) { + for (TopologyDetails td : _tds.values()) { + String topId = td.getId(); + if (_cluster.needsScheduling(td)) { + LOG.debug("Scheduling topology {}", topId); + int totalTasks = td.getExecutors().size(); + int origRequest = td.getNumWorkers(); + int slotsRequested = Math.min(totalTasks, origRequest); + int slotsUsed = Node.countSlotsUsed(topId, _nodes); + int slotsFree = Node.countFreeSlotsAlive(_nodes); + //Check to see if we have enough slots before trying to get them + int slotsAvailable = 0; + if (slotsRequested > slotsFree) { + slotsAvailable = NodePool.slotsAvailable(lesserPools); + } + int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable); + int executorsNotRunning = _cluster.getUnassignedExecutors(td).size(); + LOG.debug("Slots... requested {} used {} free {} available {} to be used {}, executors not running {}", + slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse, executorsNotRunning); + if (slotsToUse <= 0) { + if (executorsNotRunning > 0) { + _cluster.setStatus(topId, "Not fully scheduled (No free slots in default pool) " + executorsNotRunning + + " executors not scheduled"); + } else { + if (slotsUsed < slotsRequested) { + _cluster.setStatus(topId, "Running with fewer slots than requested (" + slotsUsed + "/" + origRequest + ")"); + } else { //slotsUsed < origRequest + _cluster.setStatus(topId, + "Fully Scheduled (requested " + origRequest + " slots, but could only use " + slotsUsed + + ")"); + } + } + continue; + } + + int slotsNeeded = slotsToUse - slotsFree; + if (slotsNeeded > 0) { + _nodes.addAll(NodePool.takeNodesBySlot(slotsNeeded, lesserPools)); + } + + if (executorsNotRunning <= 0) { + //There are free slots that we can take advantage of now. + for (Node n : _nodes) { + n.freeTopology(topId, _cluster); + } + slotsFree = Node.countFreeSlotsAlive(_nodes); + slotsToUse = Math.min(slotsRequested, slotsFree); + } + + RoundRobinSlotScheduler slotSched = + new RoundRobinSlotScheduler(td, slotsToUse, _cluster); + + LinkedList<Node> nodes = new LinkedList<>(_nodes); + while (true) { + Node n; + do { + if (nodes.isEmpty()) { + throw new IllegalStateException("This should not happen, we" + + " messed up and did not get enough slots"); + } + n = nodes.peekFirst(); + if (n.totalSlotsFree() == 0) { + nodes.remove(); + n = null; + } + } while (n == null); + if (!slotSched.assignSlotTo(n)) { + break; + } + } + int afterSchedSlotsUsed = Node.countSlotsUsed(topId, _nodes); + if (afterSchedSlotsUsed < slotsRequested) { + _cluster.setStatus(topId, "Running with fewer slots than requested (" + afterSchedSlotsUsed + "/" + origRequest + ")"); + } else if (afterSchedSlotsUsed < origRequest) { + _cluster.setStatus(topId, + "Fully Scheduled (requested " + origRequest + " slots, but could only use " + afterSchedSlotsUsed + + ")"); + } else { + _cluster.setStatus(topId, "Fully Scheduled"); + } + } else { + _cluster.setStatus(topId, "Fully Scheduled"); + } } - } else { - _cluster.setStatus(topId,"Fully Scheduled"); - } } - } - - @Override - public String toString() { - return "DefaultPool " + _nodes.size() + " nodes " + _tds.size() + " topologies"; - } + + @Override + public String toString() { + return "DefaultPool " + _nodes.size() + " nodes " + _tds.size() + " topologies"; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/FreePool.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/FreePool.java b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/FreePool.java index 88cec6b..e868223 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/FreePool.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/FreePool.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.multitenant; @@ -23,103 +17,101 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.storm.scheduler.Cluster; import org.apache.storm.scheduler.TopologyDetails; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * All of the machines that currently have nothing assigned to them */ public class FreePool extends NodePool { - private static final Logger LOG = LoggerFactory.getLogger(FreePool.class); - private Set<Node> _nodes = new HashSet<>(); - private int _totalSlots = 0; + private static final Logger LOG = LoggerFactory.getLogger(FreePool.class); + private Set<Node> _nodes = new HashSet<>(); + private int _totalSlots = 0; - @Override - public void init(Cluster cluster, Map<String, Node> nodeIdToNode) { - super.init(cluster, nodeIdToNode); - for (Node n: nodeIdToNode.values()) { - if(n.isTotallyFree() && n.isAlive()) { - _nodes.add(n); - _totalSlots += n.totalSlotsFree(); - } + @Override + public void init(Cluster cluster, Map<String, Node> nodeIdToNode) { + super.init(cluster, nodeIdToNode); + for (Node n : nodeIdToNode.values()) { + if (n.isTotallyFree() && n.isAlive()) { + _nodes.add(n); + _totalSlots += n.totalSlotsFree(); + } + } + LOG.debug("Found {} nodes with {} slots", _nodes.size(), _totalSlots); } - LOG.debug("Found {} nodes with {} slots", _nodes.size(), _totalSlots); - } - - @Override - public void addTopology(TopologyDetails td) { - throw new IllegalArgumentException("The free pool cannot run any topologies"); - } - @Override - public boolean canAdd(TopologyDetails td) { - // The free pool never has anything running - return false; - } - - @Override - public Collection<Node> takeNodes(int nodesNeeded) { - HashSet<Node> ret = new HashSet<>(); - Iterator<Node> it = _nodes.iterator(); - while (it.hasNext() && nodesNeeded > ret.size()) { - Node n = it.next(); - ret.add(n); - _totalSlots -= n.totalSlotsFree(); - it.remove(); + @Override + public void addTopology(TopologyDetails td) { + throw new IllegalArgumentException("The free pool cannot run any topologies"); } - return ret; - } - - @Override - public int nodesAvailable() { - return _nodes.size(); - } - @Override - public int slotsAvailable() { - return _totalSlots; - } + @Override + public boolean canAdd(TopologyDetails td) { + // The free pool never has anything running + return false; + } - @Override - public Collection<Node> takeNodesBySlots(int slotsNeeded) { - HashSet<Node> ret = new HashSet<>(); - Iterator<Node> it = _nodes.iterator(); - while (it.hasNext() && slotsNeeded > 0) { - Node n = it.next(); - ret.add(n); - _totalSlots -= n.totalSlotsFree(); - slotsNeeded -= n.totalSlotsFree(); - it.remove(); + @Override + public Collection<Node> takeNodes(int nodesNeeded) { + HashSet<Node> ret = new HashSet<>(); + Iterator<Node> it = _nodes.iterator(); + while (it.hasNext() && nodesNeeded > ret.size()) { + Node n = it.next(); + ret.add(n); + _totalSlots -= n.totalSlotsFree(); + it.remove(); + } + return ret; } - return ret; - } - - @Override - public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) { - int slotsFound = 0; - int nodesFound = 0; - Iterator<Node> it = _nodes.iterator(); - while (it.hasNext() && slotsNeeded > 0) { - Node n = it.next(); - nodesFound++; - int totalSlots = n.totalSlots(); - slotsFound += totalSlots; - slotsNeeded -= totalSlots; + + @Override + public int nodesAvailable() { + return _nodes.size(); } - return new NodeAndSlotCounts(nodesFound, slotsFound); - } - @Override - public void scheduleAsNeeded(NodePool... lesserPools) { - //No topologies running so NOOP - } - - @Override - public String toString() { - return "FreePool of "+_nodes.size()+" nodes with "+_totalSlots+" slots"; - } + @Override + public int slotsAvailable() { + return _totalSlots; + } + + @Override + public Collection<Node> takeNodesBySlots(int slotsNeeded) { + HashSet<Node> ret = new HashSet<>(); + Iterator<Node> it = _nodes.iterator(); + while (it.hasNext() && slotsNeeded > 0) { + Node n = it.next(); + ret.add(n); + _totalSlots -= n.totalSlotsFree(); + slotsNeeded -= n.totalSlotsFree(); + it.remove(); + } + return ret; + } + + @Override + public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) { + int slotsFound = 0; + int nodesFound = 0; + Iterator<Node> it = _nodes.iterator(); + while (it.hasNext() && slotsNeeded > 0) { + Node n = it.next(); + nodesFound++; + int totalSlots = n.totalSlots(); + slotsFound += totalSlots; + slotsNeeded -= totalSlots; + } + return new NodeAndSlotCounts(nodesFound, slotsFound); + } + + @Override + public void scheduleAsNeeded(NodePool... lesserPools) { + //No topologies running so NOOP + } + + @Override + public String toString() { + return "FreePool of " + _nodes.size() + " nodes with " + _totalSlots + " slots"; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/IsolatedPool.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/IsolatedPool.java b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/IsolatedPool.java index f16e4fa..19ce712 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/IsolatedPool.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/IsolatedPool.java @@ -1,371 +1,362 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.multitenant; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.storm.Config; import org.apache.storm.scheduler.SchedulerAssignment; import org.apache.storm.scheduler.TopologyDetails; import org.apache.storm.scheduler.WorkerSlot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A pool of machines that can be used to run isolated topologies */ public class IsolatedPool extends NodePool { - private static final Logger LOG = LoggerFactory.getLogger(IsolatedPool.class); - private Map<String, Set<Node>> _topologyIdToNodes = new HashMap<>(); - private HashMap<String, TopologyDetails> _tds = new HashMap<>(); - private HashSet<String> _isolated = new HashSet<>(); - private int _maxNodes; - private int _usedNodes; + private static final Logger LOG = LoggerFactory.getLogger(IsolatedPool.class); + private Map<String, Set<Node>> _topologyIdToNodes = new HashMap<>(); + private HashMap<String, TopologyDetails> _tds = new HashMap<>(); + private HashSet<String> _isolated = new HashSet<>(); + private int _maxNodes; + private int _usedNodes; - public IsolatedPool(int maxNodes) { - _maxNodes = maxNodes; - _usedNodes = 0; - } - - @Override - public void addTopology(TopologyDetails td) { - String topId = td.getId(); - LOG.debug("Adding in Topology {}", topId); - SchedulerAssignment assignment = _cluster.getAssignmentById(topId); - Set<Node> assignedNodes = new HashSet<>(); - if (assignment != null) { - for (WorkerSlot ws: assignment.getSlots()) { - Node n = _nodeIdToNode.get(ws.getNodeId()); - assignedNodes.add(n); - } - } - _usedNodes += assignedNodes.size(); - _topologyIdToNodes.put(topId, assignedNodes); - _tds.put(topId, td); - if (td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES) != null) { - _isolated.add(topId); + public IsolatedPool(int maxNodes) { + _maxNodes = maxNodes; + _usedNodes = 0; } - } - @Override - public boolean canAdd(TopologyDetails td) { - //Only add topologies that are not sharing nodes with other topologies - String topId = td.getId(); - SchedulerAssignment assignment = _cluster.getAssignmentById(topId); - if (assignment != null) { - for (WorkerSlot ws: assignment.getSlots()) { - Node n = _nodeIdToNode.get(ws.getNodeId()); - if (n.getRunningTopologies().size() > 1) { - return false; - } - } - } - return true; - } - - @Override - public void scheduleAsNeeded(NodePool ... lesserPools) { - for (String topId : _topologyIdToNodes.keySet()) { - TopologyDetails td = _tds.get(topId); - Set<Node> allNodes = _topologyIdToNodes.get(topId); - Number nodesRequested = (Number) td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES); - Integer effectiveNodesRequested = null; - if (nodesRequested != null) { - effectiveNodesRequested = Math.min(td.getExecutors().size(), - +nodesRequested.intValue()); - } - if (_cluster.needsScheduling(td) || - (effectiveNodesRequested != null && - allNodes.size() != effectiveNodesRequested)) { - LOG.debug("Scheduling topology {}", topId); - int slotsToUse = 0; - if (effectiveNodesRequested == null) { - slotsToUse = getNodesForNotIsolatedTop(td, allNodes, lesserPools); - } else { - slotsToUse = getNodesForIsolatedTop(td, allNodes, lesserPools, - effectiveNodesRequested); + @Override + public void addTopology(TopologyDetails td) { + String topId = td.getId(); + LOG.debug("Adding in Topology {}", topId); + SchedulerAssignment assignment = _cluster.getAssignmentById(topId); + Set<Node> assignedNodes = new HashSet<>(); + if (assignment != null) { + for (WorkerSlot ws : assignment.getSlots()) { + Node n = _nodeIdToNode.get(ws.getNodeId()); + assignedNodes.add(n); + } } - //No slots to schedule for some reason, so skip it. - if (slotsToUse <= 0) { - continue; + _usedNodes += assignedNodes.size(); + _topologyIdToNodes.put(topId, assignedNodes); + _tds.put(topId, td); + if (td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES) != null) { + _isolated.add(topId); } - - RoundRobinSlotScheduler slotSched = - new RoundRobinSlotScheduler(td, slotsToUse, _cluster); + } - LOG.debug("Nodes sorted by free space {}", allNodes); - while (true) { - Node n = findBestNode(allNodes); - if (n == null) { - LOG.error("No nodes to use to assign topology {}", td.getName()); - break; - } - if (!slotSched.assignSlotTo(n)) { - break; - } + @Override + public boolean canAdd(TopologyDetails td) { + //Only add topologies that are not sharing nodes with other topologies + String topId = td.getId(); + SchedulerAssignment assignment = _cluster.getAssignmentById(topId); + if (assignment != null) { + for (WorkerSlot ws : assignment.getSlots()) { + Node n = _nodeIdToNode.get(ws.getNodeId()); + if (n.getRunningTopologies().size() > 1) { + return false; + } + } } - } - Set<Node> found = _topologyIdToNodes.get(topId); - int nc = found == null ? 0 : found.size(); - _cluster.setStatus(topId,"Scheduled Isolated on "+nc+" Nodes"); + return true; } - } - private Node findBestNode(Collection<Node> nodes) { - Node ret = null; - for(Node node : nodes) { - if(ret == null ) { - if(node.totalSlotsFree() > 0) { - ret = node; - } - } else { - if (node.totalSlotsFree() > 0) { - if (node.totalSlotsUsed() < ret.totalSlotsUsed()) { - ret = node; - } else if (node.totalSlotsUsed() == ret.totalSlotsUsed()) { - if(node.totalSlotsFree() > ret.totalSlotsFree()) { - ret = node; + @Override + public void scheduleAsNeeded(NodePool... lesserPools) { + for (String topId : _topologyIdToNodes.keySet()) { + TopologyDetails td = _tds.get(topId); + Set<Node> allNodes = _topologyIdToNodes.get(topId); + Number nodesRequested = (Number) td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES); + Integer effectiveNodesRequested = null; + if (nodesRequested != null) { + effectiveNodesRequested = Math.min(td.getExecutors().size(), + +nodesRequested.intValue()); + } + if (_cluster.needsScheduling(td) || + (effectiveNodesRequested != null && + allNodes.size() != effectiveNodesRequested)) { + LOG.debug("Scheduling topology {}", topId); + int slotsToUse = 0; + if (effectiveNodesRequested == null) { + slotsToUse = getNodesForNotIsolatedTop(td, allNodes, lesserPools); + } else { + slotsToUse = getNodesForIsolatedTop(td, allNodes, lesserPools, + effectiveNodesRequested); + } + //No slots to schedule for some reason, so skip it. + if (slotsToUse <= 0) { + continue; + } + + RoundRobinSlotScheduler slotSched = + new RoundRobinSlotScheduler(td, slotsToUse, _cluster); + + LOG.debug("Nodes sorted by free space {}", allNodes); + while (true) { + Node n = findBestNode(allNodes); + if (n == null) { + LOG.error("No nodes to use to assign topology {}", td.getName()); + break; + } + if (!slotSched.assignSlotTo(n)) { + break; + } + } } - } + Set<Node> found = _topologyIdToNodes.get(topId); + int nc = found == null ? 0 : found.size(); + _cluster.setStatus(topId, "Scheduled Isolated on " + nc + " Nodes"); } - } } - return ret; - } - - /** - * Get the nodes needed to schedule an isolated topology. - * @param td the topology to be scheduled - * @param allNodes the nodes already scheduled for this topology. - * This will be updated to include new nodes if needed. - * @param lesserPools node pools we can steal nodes from - * @return the number of additional slots that should be used for scheduling. - */ - private int getNodesForIsolatedTop(TopologyDetails td, Set<Node> allNodes, - NodePool[] lesserPools, int nodesRequested) { - String topId = td.getId(); - LOG.debug("Topology {} is isolated", topId); - int nodesFromUsAvailable = nodesAvailable(); - int nodesFromOthersAvailable = NodePool.nodesAvailable(lesserPools); - int nodesUsed = _topologyIdToNodes.get(topId).size(); - int nodesNeeded = nodesRequested - nodesUsed; - LOG.debug("Nodes... requested {} used {} available from us {} " + - "avail from other {} needed {}", nodesRequested, - nodesUsed, nodesFromUsAvailable, nodesFromOthersAvailable, - nodesNeeded); - if ((nodesNeeded - nodesFromUsAvailable) > (_maxNodes - _usedNodes)) { - _cluster.setStatus(topId,"Max Nodes("+_maxNodes+") for this user would be exceeded. " - + ((nodesNeeded - nodesFromUsAvailable) - (_maxNodes - _usedNodes)) - + " more nodes needed to run topology."); - return 0; + private Node findBestNode(Collection<Node> nodes) { + Node ret = null; + for (Node node : nodes) { + if (ret == null) { + if (node.totalSlotsFree() > 0) { + ret = node; + } + } else { + if (node.totalSlotsFree() > 0) { + if (node.totalSlotsUsed() < ret.totalSlotsUsed()) { + ret = node; + } else if (node.totalSlotsUsed() == ret.totalSlotsUsed()) { + if (node.totalSlotsFree() > ret.totalSlotsFree()) { + ret = node; + } + } + } + } + } + return ret; } - //In order to avoid going over _maxNodes I may need to steal from - // myself even though other pools have free nodes. so figure out how - // much each group should provide - int nodesNeededFromOthers = Math.min(Math.min(_maxNodes - _usedNodes, - nodesFromOthersAvailable), nodesNeeded); - int nodesNeededFromUs = nodesNeeded - nodesNeededFromOthers; - LOG.debug("Nodes... needed from us {} needed from others {}", - nodesNeededFromUs, nodesNeededFromOthers); + /** + * Get the nodes needed to schedule an isolated topology. + * @param td the topology to be scheduled + * @param allNodes the nodes already scheduled for this topology. + * This will be updated to include new nodes if needed. + * @param lesserPools node pools we can steal nodes from + * @return the number of additional slots that should be used for scheduling. + */ + private int getNodesForIsolatedTop(TopologyDetails td, Set<Node> allNodes, + NodePool[] lesserPools, int nodesRequested) { + String topId = td.getId(); + LOG.debug("Topology {} is isolated", topId); + int nodesFromUsAvailable = nodesAvailable(); + int nodesFromOthersAvailable = NodePool.nodesAvailable(lesserPools); - if (nodesNeededFromUs > nodesFromUsAvailable) { - _cluster.setStatus(topId, "Not Enough Nodes Available to Schedule Topology"); - return 0; - } + int nodesUsed = _topologyIdToNodes.get(topId).size(); + int nodesNeeded = nodesRequested - nodesUsed; + LOG.debug("Nodes... requested {} used {} available from us {} " + + "avail from other {} needed {}", nodesRequested, + nodesUsed, nodesFromUsAvailable, nodesFromOthersAvailable, + nodesNeeded); + if ((nodesNeeded - nodesFromUsAvailable) > (_maxNodes - _usedNodes)) { + _cluster.setStatus(topId, "Max Nodes(" + _maxNodes + ") for this user would be exceeded. " + + ((nodesNeeded - nodesFromUsAvailable) - (_maxNodes - _usedNodes)) + + " more nodes needed to run topology."); + return 0; + } - //Get the nodes - Collection<Node> found = NodePool.takeNodes(nodesNeededFromOthers, lesserPools); - _usedNodes += found.size(); - allNodes.addAll(found); - Collection<Node> foundMore = takeNodes(nodesNeededFromUs); - _usedNodes += foundMore.size(); - allNodes.addAll(foundMore); + //In order to avoid going over _maxNodes I may need to steal from + // myself even though other pools have free nodes. so figure out how + // much each group should provide + int nodesNeededFromOthers = Math.min(Math.min(_maxNodes - _usedNodes, + nodesFromOthersAvailable), nodesNeeded); + int nodesNeededFromUs = nodesNeeded - nodesNeededFromOthers; + LOG.debug("Nodes... needed from us {} needed from others {}", + nodesNeededFromUs, nodesNeededFromOthers); - int totalTasks = td.getExecutors().size(); - int origRequest = td.getNumWorkers(); - int slotsRequested = Math.min(totalTasks, origRequest); - int slotsUsed = Node.countSlotsUsed(allNodes); - int slotsFree = Node.countFreeSlotsAlive(allNodes); - int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree); - if (slotsToUse <= 0) { - // if # of workers requested is more than we currently have - if (origRequest > slotsUsed) { - _cluster.setStatus(topId, "Running with fewer slots than requested " + slotsUsed + "/" + - origRequest + " on " + allNodes.size() + " node(s) with " + (slotsUsed + slotsFree) + " total slots"); - } else { - // if # of workers requested is less than we took - // then we know some workers we track died, since we have more workers than we are supposed to have - _cluster.setStatus(topId, "Node has partially crashed, if this situation persists rebalance the topology."); + if (nodesNeededFromUs > nodesFromUsAvailable) { + _cluster.setStatus(topId, "Not Enough Nodes Available to Schedule Topology"); + return 0; } + + //Get the nodes + Collection<Node> found = NodePool.takeNodes(nodesNeededFromOthers, lesserPools); + _usedNodes += found.size(); + allNodes.addAll(found); + Collection<Node> foundMore = takeNodes(nodesNeededFromUs); + _usedNodes += foundMore.size(); + allNodes.addAll(foundMore); + + int totalTasks = td.getExecutors().size(); + int origRequest = td.getNumWorkers(); + int slotsRequested = Math.min(totalTasks, origRequest); + int slotsUsed = Node.countSlotsUsed(allNodes); + int slotsFree = Node.countFreeSlotsAlive(allNodes); + int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree); + if (slotsToUse <= 0) { + // if # of workers requested is more than we currently have + if (origRequest > slotsUsed) { + _cluster.setStatus(topId, "Running with fewer slots than requested " + slotsUsed + "/" + + origRequest + " on " + allNodes.size() + " node(s) with " + (slotsUsed + slotsFree) + + " total slots"); + } else { + // if # of workers requested is less than we took + // then we know some workers we track died, since we have more workers than we are supposed to have + _cluster.setStatus(topId, "Node has partially crashed, if this situation persists rebalance the topology."); + } + } + return slotsToUse; } - return slotsToUse; - } - - /** - * Get the nodes needed to schedule a non-isolated topology. - * @param td the topology to be scheduled - * @param allNodes the nodes already scheduled for this topology. - * This will be updated to include new nodes if needed. - * @param lesserPools node pools we can steal nodes from - * @return the number of additional slots that should be used for scheduling. - */ - private int getNodesForNotIsolatedTop(TopologyDetails td, Set<Node> allNodes, - NodePool[] lesserPools) { - String topId = td.getId(); - LOG.debug("Topology {} is not isolated",topId); - int totalTasks = td.getExecutors().size(); - int origRequest = td.getNumWorkers(); - int slotsRequested = Math.min(totalTasks, origRequest); - int slotsUsed = Node.countSlotsUsed(topId, allNodes); - int slotsFree = Node.countFreeSlotsAlive(allNodes); - //Check to see if we have enough slots before trying to get them - int slotsAvailable = 0; - if (slotsRequested > slotsFree) { - slotsAvailable = NodePool.slotsAvailable(lesserPools); - } - int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable); - LOG.debug("Slots... requested {} used {} free {} available {} to be used {}", - slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse); - if (slotsToUse <= 0) { - _cluster.setStatus(topId, "Not Enough Slots Available to Schedule Topology"); - return 0; - } - int slotsNeeded = slotsToUse - slotsFree; - int numNewNodes = NodePool.getNodeCountIfSlotsWereTaken(slotsNeeded, lesserPools); - LOG.debug("Nodes... new {} used {} max {}", - numNewNodes, _usedNodes, _maxNodes); - if ((numNewNodes + _usedNodes) > _maxNodes) { - _cluster.setStatus(topId,"Max Nodes("+_maxNodes+") for this user would be exceeded. " + - (numNewNodes - (_maxNodes - _usedNodes)) + " more nodes needed to run topology."); - return 0; + + /** + * Get the nodes needed to schedule a non-isolated topology. + * @param td the topology to be scheduled + * @param allNodes the nodes already scheduled for this topology. + * This will be updated to include new nodes if needed. + * @param lesserPools node pools we can steal nodes from + * @return the number of additional slots that should be used for scheduling. + */ + private int getNodesForNotIsolatedTop(TopologyDetails td, Set<Node> allNodes, + NodePool[] lesserPools) { + String topId = td.getId(); + LOG.debug("Topology {} is not isolated", topId); + int totalTasks = td.getExecutors().size(); + int origRequest = td.getNumWorkers(); + int slotsRequested = Math.min(totalTasks, origRequest); + int slotsUsed = Node.countSlotsUsed(topId, allNodes); + int slotsFree = Node.countFreeSlotsAlive(allNodes); + //Check to see if we have enough slots before trying to get them + int slotsAvailable = 0; + if (slotsRequested > slotsFree) { + slotsAvailable = NodePool.slotsAvailable(lesserPools); + } + int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable); + LOG.debug("Slots... requested {} used {} free {} available {} to be used {}", + slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse); + if (slotsToUse <= 0) { + _cluster.setStatus(topId, "Not Enough Slots Available to Schedule Topology"); + return 0; + } + int slotsNeeded = slotsToUse - slotsFree; + int numNewNodes = NodePool.getNodeCountIfSlotsWereTaken(slotsNeeded, lesserPools); + LOG.debug("Nodes... new {} used {} max {}", + numNewNodes, _usedNodes, _maxNodes); + if ((numNewNodes + _usedNodes) > _maxNodes) { + _cluster.setStatus(topId, "Max Nodes(" + _maxNodes + ") for this user would be exceeded. " + + (numNewNodes - (_maxNodes - _usedNodes)) + " more nodes needed to run topology."); + return 0; + } + + Collection<Node> found = NodePool.takeNodesBySlot(slotsNeeded, lesserPools); + _usedNodes += found.size(); + allNodes.addAll(found); + return slotsToUse; } - - Collection<Node> found = NodePool.takeNodesBySlot(slotsNeeded, lesserPools); - _usedNodes += found.size(); - allNodes.addAll(found); - return slotsToUse; - } - @Override - public Collection<Node> takeNodes(int nodesNeeded) { - LOG.debug("Taking {} from {}", nodesNeeded, this); - HashSet<Node> ret = new HashSet<>(); - for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) { - if (!_isolated.contains(entry.getKey())) { - Iterator<Node> it = entry.getValue().iterator(); - while (it.hasNext()) { - if (nodesNeeded <= 0) { - return ret; - } - Node n = it.next(); - it.remove(); - n.freeAllSlots(_cluster); - ret.add(n); - nodesNeeded--; - _usedNodes--; + @Override + public Collection<Node> takeNodes(int nodesNeeded) { + LOG.debug("Taking {} from {}", nodesNeeded, this); + HashSet<Node> ret = new HashSet<>(); + for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) { + if (!_isolated.contains(entry.getKey())) { + Iterator<Node> it = entry.getValue().iterator(); + while (it.hasNext()) { + if (nodesNeeded <= 0) { + return ret; + } + Node n = it.next(); + it.remove(); + n.freeAllSlots(_cluster); + ret.add(n); + nodesNeeded--; + _usedNodes--; + } + } } - } + return ret; } - return ret; - } - - @Override - public int nodesAvailable() { - int total = 0; - for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) { - if (!_isolated.contains(entry.getKey())) { - total += entry.getValue().size(); - } + + @Override + public int nodesAvailable() { + int total = 0; + for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) { + if (!_isolated.contains(entry.getKey())) { + total += entry.getValue().size(); + } + } + return total; } - return total; - } - - @Override - public int slotsAvailable() { - int total = 0; - for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) { - if (!_isolated.contains(entry.getKey())) { - total += Node.countTotalSlotsAlive(entry.getValue()); - } + + @Override + public int slotsAvailable() { + int total = 0; + for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) { + if (!_isolated.contains(entry.getKey())) { + total += Node.countTotalSlotsAlive(entry.getValue()); + } + } + return total; } - return total; - } - @Override - public Collection<Node> takeNodesBySlots(int slotsNeeded) { - HashSet<Node> ret = new HashSet<>(); - for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) { - if (!_isolated.contains(entry.getKey())) { - Iterator<Node> it = entry.getValue().iterator(); - while (it.hasNext()) { - Node n = it.next(); - if (n.isAlive()) { - it.remove(); - _usedNodes--; - n.freeAllSlots(_cluster); - ret.add(n); - slotsNeeded -= n.totalSlots(); - if (slotsNeeded <= 0) { - return ret; + @Override + public Collection<Node> takeNodesBySlots(int slotsNeeded) { + HashSet<Node> ret = new HashSet<>(); + for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) { + if (!_isolated.contains(entry.getKey())) { + Iterator<Node> it = entry.getValue().iterator(); + while (it.hasNext()) { + Node n = it.next(); + if (n.isAlive()) { + it.remove(); + _usedNodes--; + n.freeAllSlots(_cluster); + ret.add(n); + slotsNeeded -= n.totalSlots(); + if (slotsNeeded <= 0) { + return ret; + } + } + } } - } } - } + return ret; } - return ret; - } - - @Override - public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) { - int nodesFound = 0; - int slotsFound = 0; - for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) { - if (!_isolated.contains(entry.getKey())) { - for (Node n : entry.getValue()) { - if (n.isAlive()) { - nodesFound++; - int totalSlotsFree = n.totalSlots(); - slotsFound += totalSlotsFree; - slotsNeeded -= totalSlotsFree; - if (slotsNeeded <= 0) { - return new NodeAndSlotCounts(nodesFound, slotsFound); + + @Override + public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) { + int nodesFound = 0; + int slotsFound = 0; + for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) { + if (!_isolated.contains(entry.getKey())) { + for (Node n : entry.getValue()) { + if (n.isAlive()) { + nodesFound++; + int totalSlotsFree = n.totalSlots(); + slotsFound += totalSlotsFree; + slotsNeeded -= totalSlotsFree; + if (slotsNeeded <= 0) { + return new NodeAndSlotCounts(nodesFound, slotsFound); + } + } + } } - } } - } + return new NodeAndSlotCounts(nodesFound, slotsFound); + } + + @Override + public String toString() { + return "IsolatedPool... "; } - return new NodeAndSlotCounts(nodesFound, slotsFound); - } - - @Override - public String toString() { - return "IsolatedPool... "; - } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java index 65c657e..24c008e 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.multitenant; @@ -32,94 +26,94 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MultitenantScheduler implements IScheduler { - private static final Logger LOG = LoggerFactory.getLogger(MultitenantScheduler.class); - private Map<String, Object> conf; - protected IConfigLoader configLoader; - - @Override - public void prepare(Map<String, Object> conf) { - this.conf = conf; - configLoader = ConfigLoaderFactoryService.createConfigLoader(conf); - - } - - /** - * Load from configLoaders first; if no config available, read from multitenant-scheduler.yaml; - * if no config available from multitenant-scheduler.yaml, get configs from conf. Only one will be used. - * @return User pool configs. - */ - private Map<String, Number> getUserConf() { - Map<String, Number> ret; - - // Try the loader plugin, if configured - if (configLoader != null) { - ret = (Map<String, Number>) configLoader.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); - if (ret != null) { - return ret; - } else { - LOG.warn("Config loader returned null. Will try to read from multitenant-scheduler.yaml"); - } - } + private static final Logger LOG = LoggerFactory.getLogger(MultitenantScheduler.class); + protected IConfigLoader configLoader; + private Map<String, Object> conf; - // If that fails, fall back on the multitenant-scheduler.yaml file - Map fromFile = Utils.findAndReadConfigFile("multitenant-scheduler.yaml", false); - ret = (Map<String, Number>)fromFile.get(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); - if (ret != null) { - return ret; - } else { - LOG.warn("Reading from multitenant-scheduler.yaml returned null. This could because the file is not available. " - + "Will load configs from storm configuration"); - } + @Override + public void prepare(Map<String, Object> conf) { + this.conf = conf; + configLoader = ConfigLoaderFactoryService.createConfigLoader(conf); - // If that fails, use config - ret = (Map<String, Number>) conf.get(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); - if (ret == null) { - return new HashMap<>(); - } else { - return ret; } - } - - @Override - public Map config() { - return getUserConf(); - } - - @Override - public void schedule(Topologies topologies, Cluster cluster) { - LOG.debug("Rerunning scheduling..."); - Map<String, Node> nodeIdToNode = Node.getAllNodesFrom(cluster); - - Map<String, Number> userConf = getUserConf(); - - Map<String, IsolatedPool> userPools = new HashMap<>(); - for (Map.Entry<String, Number> entry : userConf.entrySet()) { - userPools.put(entry.getKey(), new IsolatedPool(entry.getValue().intValue())); - } - DefaultPool defaultPool = new DefaultPool(); - FreePool freePool = new FreePool(); - - freePool.init(cluster, nodeIdToNode); - for (IsolatedPool pool : userPools.values()) { - pool.init(cluster, nodeIdToNode); + + /** + * Load from configLoaders first; if no config available, read from multitenant-scheduler.yaml; + * if no config available from multitenant-scheduler.yaml, get configs from conf. Only one will be used. + * @return User pool configs. + */ + private Map<String, Number> getUserConf() { + Map<String, Number> ret; + + // Try the loader plugin, if configured + if (configLoader != null) { + ret = (Map<String, Number>) configLoader.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + if (ret != null) { + return ret; + } else { + LOG.warn("Config loader returned null. Will try to read from multitenant-scheduler.yaml"); + } + } + + // If that fails, fall back on the multitenant-scheduler.yaml file + Map fromFile = Utils.findAndReadConfigFile("multitenant-scheduler.yaml", false); + ret = (Map<String, Number>) fromFile.get(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + if (ret != null) { + return ret; + } else { + LOG.warn("Reading from multitenant-scheduler.yaml returned null. This could because the file is not available. " + + "Will load configs from storm configuration"); + } + + // If that fails, use config + ret = (Map<String, Number>) conf.get(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + if (ret == null) { + return new HashMap<>(); + } else { + return ret; + } } - defaultPool.init(cluster, nodeIdToNode); - - for (TopologyDetails td: topologies.getTopologies()) { - String user = td.getTopologySubmitter(); - LOG.debug("Found top {} run by user {}",td.getId(), user); - NodePool pool = userPools.get(user); - if (pool == null || !pool.canAdd(td)) { - pool = defaultPool; - } - pool.addTopology(td); + + @Override + public Map config() { + return getUserConf(); } - - //Now schedule all of the topologies that need to be scheduled - for (IsolatedPool pool : userPools.values()) { - pool.scheduleAsNeeded(freePool, defaultPool); + + @Override + public void schedule(Topologies topologies, Cluster cluster) { + LOG.debug("Rerunning scheduling..."); + Map<String, Node> nodeIdToNode = Node.getAllNodesFrom(cluster); + + Map<String, Number> userConf = getUserConf(); + + Map<String, IsolatedPool> userPools = new HashMap<>(); + for (Map.Entry<String, Number> entry : userConf.entrySet()) { + userPools.put(entry.getKey(), new IsolatedPool(entry.getValue().intValue())); + } + DefaultPool defaultPool = new DefaultPool(); + FreePool freePool = new FreePool(); + + freePool.init(cluster, nodeIdToNode); + for (IsolatedPool pool : userPools.values()) { + pool.init(cluster, nodeIdToNode); + } + defaultPool.init(cluster, nodeIdToNode); + + for (TopologyDetails td : topologies.getTopologies()) { + String user = td.getTopologySubmitter(); + LOG.debug("Found top {} run by user {}", td.getId(), user); + NodePool pool = userPools.get(user); + if (pool == null || !pool.canAdd(td)) { + pool = defaultPool; + } + pool.addTopology(td); + } + + //Now schedule all of the topologies that need to be scheduled + for (IsolatedPool pool : userPools.values()) { + pool.scheduleAsNeeded(freePool, defaultPool); + } + defaultPool.scheduleAsNeeded(freePool); + LOG.debug("Scheduling done..."); } - defaultPool.scheduleAsNeeded(freePool); - LOG.debug("Scheduling done..."); - } }
