[
https://issues.apache.org/jira/browse/STORM-898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15064308#comment-15064308
]
ASF GitHub Bot commented on STORM-898:
--------------------------------------
Github user d2r commented on a diff in the pull request:
https://github.com/apache/storm/pull/921#discussion_r48050274
--- Diff:
storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
---
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies.eviction;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.RAS_Nodes;
+import backtype.storm.scheduler.resource.User;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class DefaultEvictionStrategy implements IEvictionStrategy {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(DefaultEvictionStrategy.class);
+
+ private Topologies topologies;
+ private Cluster cluster;
+ private Map<String, User> userMap;
+ private RAS_Nodes nodes;
+
+ @Override
+ public void prepare(Topologies topologies, Cluster cluster,
Map<String, User> userMap, RAS_Nodes nodes) {
+ this.topologies = topologies;
+ this.cluster = cluster;
+ this.userMap = userMap;
+ this.nodes = nodes;
+ }
+
+ @Override
+ public boolean makeSpaceForTopo(TopologyDetails td) {
+ LOG.debug("attempting to make space for topo {} from user {}",
td.getName(), td.getTopologySubmitter());
+ User submitter = this.userMap.get(td.getTopologySubmitter());
+ if (submitter.getCPUResourceGuaranteed() == null ||
submitter.getMemoryResourceGuaranteed() == null) {
+ return false;
+ }
+ double cpuNeeded = td.getTotalRequestedCpu() /
submitter.getCPUResourceGuaranteed();
+ double memoryNeeded = (td.getTotalRequestedMemOffHeap() +
td.getTotalRequestedMemOnHeap()) / submitter.getMemoryResourceGuaranteed();
+
+ User evictUser =
this.findUserWithHighestAverageResourceUtilAboveGuarantee();
+ //user has enough resource under his or her resource guarantee to
schedule topology
+ if ((1.0 - submitter.getCPUResourcePoolUtilization()) >= cpuNeeded
&& (1.0 - submitter.getMemoryResourcePoolUtilization()) >= memoryNeeded) {
+ if (evictUser != null) {
+ TopologyDetails topologyEvict =
evictUser.getRunningTopologyWithLowestPriority();
+ evictTopology(topologyEvict);
+ return true;
+ }
+ } else {
+ if (evictUser != null) {
+ if ((evictUser.getResourcePoolAverageUtilization() - 1.0)
> (cpuNeeded + (submitter.getResourcePoolAverageUtilization() - 1.0))) {
+ TopologyDetails topologyEvict =
evictUser.getRunningTopologyWithLowestPriority();
+ evictTopology(topologyEvict);
+ return true;
+ }
+ }
+ }
+ //See if there is a lower priority topology that can be evicted
from the current user
+ for (TopologyDetails topo : submitter.getTopologiesRunning()) {
+ //check to if there is a topology with a lower priority we can
evict
+ if (topo.getTopologyPriority() > td.getTopologyPriority()) {
+ evictTopology(topo);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void evictTopology(TopologyDetails topologyEvict) {
+ Collection<WorkerSlot> workersToEvict =
this.cluster.getUsedSlotsByTopologyId(topologyEvict.getId());
+ User submitter =
this.userMap.get(topologyEvict.getTopologySubmitter());
+
+ LOG.info("Evicting Topology {} with workers: {} from user {}",
topologyEvict.getName(), workersToEvict, topologyEvict.getTopologySubmitter());
+ this.nodes.freeSlots(workersToEvict);
+ submitter.moveTopoFromRunningToPending(topologyEvict,
this.cluster);
+ }
+
+ private User findUserWithHighestAverageResourceUtilAboveGuarantee() {
+ double most = 0.0;
+ User mostOverUser = null;
+ for (User user : this.userMap.values()) {
+ double over = user.getResourcePoolAverageUtilization() - 1.0;
+ if ((over > most) && (!user.getTopologiesRunning().isEmpty()))
{
--- End diff --
So could we directly check the value returned from
getResourcePoolAverageUtilization for sanity?
We could still do this in addition to checking whether the user has running
topologies if it would be safer, or if it would not really matter, we can
remove the user topo check.
> Add priorities and per user resource guarantees to Resource Aware Scheduler
> ---------------------------------------------------------------------------
>
> Key: STORM-898
> URL: https://issues.apache.org/jira/browse/STORM-898
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Boyang Jerry Peng
> Attachments: Resource Aware Scheduler for Storm.pdf
>
>
> In a multi-tenant environment we would like to be able to give individual
> users a guarantee of how much CPU/Memory/Network they will be able to use in
> a cluster. We would also like to know which topologies a user feels are the
> most important to keep running if there are not enough resources to run all
> of their topologies.
> Each user should be able to specify if their topology is production, staging,
> or development. Within each of those categories a user should be able to give
> a topology a priority, 0 to 10 with 10 being the highest priority (or
> something like this).
> If there are not enough resources on a cluster to run a topology assume this
> topology is running using resources and find the user that is most over their
> guaranteed resources. Shoot the lowest priority topology for that user, and
> repeat until, this topology is able to run, or this topology would be the one
> shot. Ideally we don't actually shoot anything until we know that we would
> have made enough room.
> If the cluster is over-subscribed and everyone is under their guarantee, and
> this topology would not put the user over their guarantee. Shoot the lowest
> priority topology in this workers resource pool until there is enough room to
> run the topology or this topology is the one that would be shot. We might
> also want to think about what to do if we are going to shoot a production
> topology in an oversubscribed case, and perhaps we can shoot a non-production
> topology instead even if the other user is not over their guarantee.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)