[ 
https://issues.apache.org/jira/browse/STORM-898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15042104#comment-15042104
 ] 

ASF GitHub Bot commented on STORM-898:
--------------------------------------

Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/921#discussion_r46724385
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/User.java ---
    @@ -0,0 +1,344 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.scheduler.resource;
    +
    +import backtype.storm.scheduler.Cluster;
    +import backtype.storm.scheduler.TopologyDetails;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeSet;
    +
    +public class User {
    +    private String userId;
    +    //Topologies yet to be scheduled sorted by priority for each user
    +    private TreeSet<TopologyDetails> pendingQueue = new 
TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
    +
    +    //Topologies yet to be scheduled sorted by priority for each user
    +    private TreeSet<TopologyDetails> runningQueue = new 
TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
    +
    +    //Topologies that was attempted to be scheduled but wasn't successull
    +    private TreeSet<TopologyDetails> attemptedQueue = new 
TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
    +
    +    private TreeSet<TopologyDetails> invalidQueue = new 
TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
    +
    +    private Map<String, Double> resourcePool = new HashMap<String, 
Double>();
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(User.class);
    +
    +    public User(String userId) {
    +        this.userId = userId;
    +    }
    +
    +    public User(String userId, Map<String, Double> resourcePool) {
    +        this(userId);
    +        if (resourcePool != null) {
    +            this.resourcePool.putAll(resourcePool);
    +        }
    +        if (this.resourcePool.get("cpu") == null) {
    +            this.resourcePool.put("cpu", 0.0);
    +        }
    +        if (this.resourcePool.get("memory") == null) {
    +            this.resourcePool.put("memory", 0.0);
    +        }
    +    }
    +
    +    public User getCopy() {
    +        User newUser = new User(this.userId, this.resourcePool);
    +        for (TopologyDetails topo : this.pendingQueue) {
    +            newUser.addTopologyToPendingQueue(topo);
    +        }
    +        for (TopologyDetails topo : this.runningQueue) {
    +            newUser.addTopologyToRunningQueue(topo);
    +        }
    +        for (TopologyDetails topo : this.attemptedQueue) {
    +            newUser.addTopologyToAttemptedQueue(topo);
    +        }
    +        for (TopologyDetails topo : this.invalidQueue) {
    +            newUser.addTopologyToInvalidQueue(topo);
    +        }
    +        return newUser;
    +    }
    +
    +    public String getId() {
    +        return this.userId;
    +    }
    +
    +    public void addTopologyToPendingQueue(TopologyDetails topo, Cluster 
cluster) {
    +        this.pendingQueue.add(topo);
    +        if (cluster != null) {
    +            cluster.setStatus(topo.getId(), "Scheduling Pending");
    +        }
    +    }
    +
    +    public void addTopologyToPendingQueue(TopologyDetails topo) {
    +        this.addTopologyToPendingQueue(topo, null);
    +    }
    +
    +    public void addTopologyToRunningQueue(TopologyDetails topo, Cluster 
cluster) {
    +        this.runningQueue.add(topo);
    +        if (cluster != null) {
    +            cluster.setStatus(topo.getId(), "Fully Scheduled");
    +        }
    +    }
    +
    +    public void addTopologyToRunningQueue(TopologyDetails topo) {
    +        this.addTopologyToRunningQueue(topo, null);
    +    }
    +
    +    public Set<TopologyDetails> getTopologiesPending() {
    +        TreeSet<TopologyDetails> ret = new TreeSet<TopologyDetails>(new 
PQsortByPriorityAndSubmittionTime());
    +        ret.addAll(this.pendingQueue);
    +        return ret;
    +    }
    +
    +    public void addTopologyToAttemptedQueue(TopologyDetails topo) {
    +        this.attemptedQueue.add(topo);
    +    }
    +
    +    public void addTopologyToInvalidQueue(TopologyDetails topo) {
    +        this.invalidQueue.add(topo);
    +    }
    +
    +    public Set<TopologyDetails> getTopologiesRunning() {
    +        TreeSet<TopologyDetails> ret = new TreeSet<TopologyDetails>(new 
PQsortByPriorityAndSubmittionTime());
    +        ret.addAll(this.runningQueue);
    +        return ret;
    +    }
    +
    +    public Set<TopologyDetails> getTopologiesAttempted() {
    +        TreeSet<TopologyDetails> ret = new TreeSet<TopologyDetails>(new 
PQsortByPriorityAndSubmittionTime());
    +        ret.addAll(this.attemptedQueue);
    +        return ret;
    +    }
    +
    +    public Set<TopologyDetails> getTopologiesInvalid() {
    +        TreeSet<TopologyDetails> ret = new TreeSet<TopologyDetails>(new 
PQsortByPriorityAndSubmittionTime());
    +        ret.addAll(this.invalidQueue);
    +        return ret;
    +    }
    +
    +    public Map<String, Number> getResourcePool() {
    +        if (this.resourcePool != null) {
    +            return new HashMap<String, Number>(this.resourcePool);
    +        }
    +        return null;
    +    }
    +
    +    public void moveTopoFromPendingToRunning(TopologyDetails topo, Cluster 
cluster) {
    +        moveTopology(topo, this.pendingQueue, "pending", 
this.runningQueue, "running");
    +        if (cluster != null) {
    +            cluster.setStatus(topo.getId(), "Fully Scheduled");
    +        }
    +    }
    +
    +    public void moveTopoFromPendingToRunning(TopologyDetails topo) {
    +        this.moveTopoFromPendingToRunning(topo, null);
    +    }
    +
    +
    +    public void moveTopoFromPendingToAttempted(TopologyDetails topo, 
Cluster cluster) {
    +        moveTopology(topo, this.pendingQueue, "pending", 
this.attemptedQueue, "attempted");
    +        if (cluster != null) {
    +            cluster.setStatus(topo.getId(), "Scheduling Attempted but 
Failed");
    +        }
    +    }
    +
    +    public void moveTopoFromPendingToAttempted(TopologyDetails topo) {
    +        this.moveTopoFromPendingToAttempted(topo, null);
    +    }
    +
    +
    +    public void moveTopoFromPendingToInvalid(TopologyDetails topo, Cluster 
cluster) {
    +        moveTopology(topo, this.pendingQueue, "pending", 
this.invalidQueue, "invalid");
    +        if (cluster != null) {
    +            cluster.setStatus(topo.getId(), "Scheduling Attempted but 
topology is invalid");
    +        }
    +    }
    +
    +    public void moveTopoFromPendingToInvalid(TopologyDetails topo) {
    +        this.moveTopoFromPendingToInvalid(topo, null);
    +    }
    +
    +
    +    public void moveTopoFromRunningToPending(TopologyDetails topo, Cluster 
cluster) {
    +        moveTopology(topo, this.runningQueue, "running", 
this.pendingQueue, "pending");
    +        if (cluster != null) {
    +            cluster.setStatus(topo.getId(), "Scheduling Pending");
    +        }
    +    }
    +
    +    public void moveTopoFromRunningToPending(TopologyDetails topo) {
    +        this.moveTopoFromRunningToPending(topo, null);
    +    }
    +
    +
    +    private void moveTopology(TopologyDetails topo, Set<TopologyDetails> 
src, String srcName, Set<TopologyDetails> dest, String destName) {
    +        LOG.debug("For User {} Moving topo {} from {} to {}", this.userId, 
topo.getName(), srcName, destName);
    +        if (topo == null) {
    +            return;
    +        }
    +        if (!src.contains(topo)) {
    +            LOG.warn("Topo {} not in User: {} {} queue!", topo.getName(), 
this.userId, srcName);
    +            return;
    +        }
    +        if (dest.contains(topo)) {
    +            LOG.warn("Topo {} already in in User: {} {} queue!", 
topo.getName(), this.userId, destName);
    +            return;
    +        }
    +        src.remove(topo);
    +        dest.add(topo);
    +    }
    +
    +
    +    public double getResourcePoolAverageUtilization() {
    +        Double cpuResourcePoolUtilization = 
this.getCPUResourcePoolUtilization();
    +        Double memoryResourcePoolUtilization = 
this.getMemoryResourcePoolUtilization();
    +
    +        if (cpuResourcePoolUtilization != null && 
memoryResourcePoolUtilization != null) {
    +            //cannot be (cpuResourcePoolUtilization + 
memoryResourcePoolUtilization)/2
    +            //since memoryResourcePoolUtilization or 
cpuResourcePoolUtilization can be Double.MAX_VALUE
    +            //Should not return infinity in that case
    +            return ((cpuResourcePoolUtilization) / 2.0) + 
((memoryResourcePoolUtilization) / 2.0);
    +        }
    +        return Double.MAX_VALUE;
    +    }
    +
    +    public double getCPUResourcePoolUtilization() {
    +        Double cpuGuarantee = this.resourcePool.get("cpu");
    +        if (cpuGuarantee == null || cpuGuarantee == 0.0) {
    +            return Double.MAX_VALUE;
    +        }
    +        return this.getCPUResourceUsedByUser() / cpuGuarantee;
    +    }
    +
    +    public double getMemoryResourcePoolUtilization() {
    +        Double memoryGuarantee = this.resourcePool.get("memory");
    +        if (memoryGuarantee == null || memoryGuarantee == 0.0) {
    +            return Double.MAX_VALUE;
    +        }
    +        return this.getMemoryResourceUsedByUser() / memoryGuarantee;
    +    }
    +
    +
    +    public double getCPUResourceUsedByUser() {
    +        double sum = 0.0;
    +        for (TopologyDetails topo : this.runningQueue) {
    +            sum += topo.getTotalRequestedCpu();
    +        }
    +        return sum;
    +    }
    +
    +    public double getMemoryResourceUsedByUser() {
    +        double sum = 0.0;
    +        for (TopologyDetails topo : this.runningQueue) {
    +            sum += topo.getTotalRequestedMemOnHeap() + 
topo.getTotalRequestedMemOffHeap();
    +        }
    +        return sum;
    +    }
    +
    +    public Double getMemoryResourceGuaranteed() {
    +        return this.resourcePool.get("memory");
    +    }
    +
    +    public Double getCPUResourceGuaranteed() {
    +        return this.resourcePool.get("cpu");
    +    }
    +
    +    public TopologyDetails getNextTopologyToSchedule() {
    +        for (TopologyDetails topo : this.pendingQueue) {
    +            if (!this.attemptedQueue.contains(topo)) {
    +                return topo;
    +            }
    +        }
    +        return null;
    +    }
    +
    +    public boolean hasTopologyNeedSchedule() {
    +        return (!this.pendingQueue.isEmpty());
    +    }
    +
    +    public TopologyDetails getRunningTopologyWithLowestPriority() {
    +        if (this.runningQueue.isEmpty()) {
    +            return null;
    +        }
    +        return this.runningQueue.last();
    +    }
    +
    +    @Override
    +    public int hashCode() {
    +        return this.userId.hashCode();
    +    }
    --- End diff --
    
    will fix


> Add priorities and per user resource guarantees to Resource Aware Scheduler
> ---------------------------------------------------------------------------
>
>                 Key: STORM-898
>                 URL: https://issues.apache.org/jira/browse/STORM-898
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Boyang Jerry Peng
>         Attachments: Resource Aware Scheduler for Storm.pdf
>
>
> In a multi-tenant environment we would like to be able to give individual 
> users a guarantee of how much CPU/Memory/Network they will be able to use in 
> a cluster.  We would also like to know which topologies a user feels are the 
> most important to keep running if there are not enough resources to run all 
> of their topologies.
> Each user should be able to specify if their topology is production, staging, 
> or development. Within each of those categories a user should be able to give 
> a topology a priority, 0 to 10 with 10 being the highest priority (or 
> something like this).
> If there are not enough resources on a cluster to run a topology assume this 
> topology is running using resources and find the user that is most over their 
> guaranteed resources.  Shoot the lowest priority topology for that user, and 
> repeat until, this topology is able to run, or this topology would be the one 
> shot.   Ideally we don't actually shoot anything until we know that we would 
> have made enough room.
> If the cluster is over-subscribed and everyone is under their guarantee, and 
> this topology would not put the user over their guarantee.  Shoot the lowest 
> priority topology in this workers resource pool until there is enough room to 
> run the topology or this topology is the one that would be shot.  We might 
> also want to think about what to do if we are going to shoot a production 
> topology in an oversubscribed case, and perhaps we can shoot a non-production 
> topology instead even if the other user is not over their guarantee.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to