[ 
https://issues.apache.org/jira/browse/STORM-898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15062715#comment-15062715
 ] 

ASF GitHub Bot commented on STORM-898:
--------------------------------------

Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/921#discussion_r47955415
  
    --- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
 ---
    @@ -128,56 +308,121 @@ private void updateSupervisorsResources(Cluster 
cluster, Topologies topologies)
             cluster.setSupervisorsResourcesMap(supervisors_resources);
         }
     
    -    private Map<String, Double> getUserConf() {
    -        Map<String, Double> ret = new HashMap<String, Double>();
    -        ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
    -                (Double) 
_conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB));
    -        ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB,
    -                (Double) 
_conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB));
    -        return ret;
    +    public User getUser(String user) {
    +        return this.userMap.get(user);
    +    }
    +
    +    public Map<String, User> getUserMap() {
    +        return this.userMap;
         }
     
         /**
    -     * print scheduling for debug purposes
    -     * @param cluster
    +     * Intialize scheduling and running queues
    +     *
          * @param topologies
    +     * @param cluster
          */
    -    public String printScheduling(Cluster cluster, Topologies topologies) {
    -        StringBuilder str = new StringBuilder();
    -        Map<String, Map<String, Map<WorkerSlot, 
Collection<ExecutorDetails>>>> schedulingMap = new HashMap<String, Map<String, 
Map<WorkerSlot, Collection<ExecutorDetails>>>>();
    -        for (TopologyDetails topo : topologies.getTopologies()) {
    -            if (cluster.getAssignmentById(topo.getId()) != null) {
    -                for (Map.Entry<ExecutorDetails, WorkerSlot> entry : 
cluster.getAssignmentById(topo.getId()).getExecutorToSlot().entrySet()) {
    -                    WorkerSlot slot = entry.getValue();
    -                    String nodeId = slot.getNodeId();
    -                    ExecutorDetails exec = entry.getKey();
    -                    if (schedulingMap.containsKey(nodeId) == false) {
    -                        schedulingMap.put(nodeId, new HashMap<String, 
Map<WorkerSlot, Collection<ExecutorDetails>>>());
    -                    }
    -                    if 
(schedulingMap.get(nodeId).containsKey(topo.getId()) == false) {
    -                        schedulingMap.get(nodeId).put(topo.getId(), new 
HashMap<WorkerSlot, Collection<ExecutorDetails>>());
    -                    }
    -                    if 
(schedulingMap.get(nodeId).get(topo.getId()).containsKey(slot) == false) {
    -                        
schedulingMap.get(nodeId).get(topo.getId()).put(slot, new 
LinkedList<ExecutorDetails>());
    -                    }
    -                    
schedulingMap.get(nodeId).get(topo.getId()).get(slot).add(exec);
    +    private void initUsers(Topologies topologies, Cluster cluster) {
    +        this.userMap = new HashMap<String, User>();
    +        Map<String, Map<String, Double>> userResourcePools = 
this.getUserResourcePools();
    +        LOG.debug("userResourcePools: {}", userResourcePools);
    +
    +        for (TopologyDetails td : topologies.getTopologies()) {
    +            //Get user that submitted topology.  If topology submitter is 
null or empty string, the topologySubmitter
    +            //will be set to anonymous
    +            String topologySubmitter = td.getTopologySubmitter();
    +            //additional safety check to make sure that topologySubmitter 
is going to be a valid value
    +            if (topologySubmitter == null || topologySubmitter.equals("")) 
{
    +                LOG.error("Cannot determine user for topology {}.  Will 
skip scheduling this topology", td.getName());
    +                continue;
    +            }
    +            if (!this.userMap.containsKey(topologySubmitter)) {
    +                this.userMap.put(topologySubmitter, new 
User(topologySubmitter, userResourcePools.get(topologySubmitter)));
    +            }
    +            if (cluster.getUnassignedExecutors(td).size() > 0) {
    +                LOG.debug("adding td: {} to pending queue", td.getName());
    +                
this.userMap.get(topologySubmitter).addTopologyToPendingQueue(td);
    +            } else {
    +                LOG.debug("adding td: {} to running queue with existing 
status: {}", td.getName(), cluster.getStatusMap().get(td.getId()));
    +                
this.userMap.get(topologySubmitter).addTopologyToRunningQueue(td);
    +                if (cluster.getStatusMap().get(td.getId()) == null || 
cluster.getStatusMap().get(td.getId()).equals("")) {
    +                    cluster.setStatus(td.getId(), "Fully Scheduled");
                     }
                 }
             }
    +    }
     
    -        for (Map.Entry<String, Map<String, Map<WorkerSlot, 
Collection<ExecutorDetails>>>> entry : schedulingMap.entrySet()) {
    -            if (cluster.getSupervisorById(entry.getKey()) != null) {
    -                str.append("/** Node: " + 
cluster.getSupervisorById(entry.getKey()).getHost() + "-" + entry.getKey() + " 
**/\n");
    -            } else {
    -                str.append("/** Node: Unknown may be dead -" + 
entry.getKey() + " **/\n");
    +    private void initialize(Topologies topologies, Cluster cluster) {
    +        this.cluster = cluster;
    +        this.topologies = topologies;
    +        this.nodes = new RAS_Nodes(this.cluster, this.topologies);
    +        initUsers(topologies, cluster);
    +    }
    +
    +    /**
    +     * Get resource guarantee configs
    +     *
    +     * @return
    --- End diff --
    
    minor: javadoc what this returns


> Add priorities and per user resource guarantees to Resource Aware Scheduler
> ---------------------------------------------------------------------------
>
>                 Key: STORM-898
>                 URL: https://issues.apache.org/jira/browse/STORM-898
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Boyang Jerry Peng
>         Attachments: Resource Aware Scheduler for Storm.pdf
>
>
> In a multi-tenant environment we would like to be able to give individual 
> users a guarantee of how much CPU/Memory/Network they will be able to use in 
> a cluster.  We would also like to know which topologies a user feels are the 
> most important to keep running if there are not enough resources to run all 
> of their topologies.
> Each user should be able to specify if their topology is production, staging, 
> or development. Within each of those categories a user should be able to give 
> a topology a priority, 0 to 10 with 10 being the highest priority (or 
> something like this).
> If there are not enough resources on a cluster to run a topology assume this 
> topology is running using resources and find the user that is most over their 
> guaranteed resources.  Shoot the lowest priority topology for that user, and 
> repeat until, this topology is able to run, or this topology would be the one 
> shot.   Ideally we don't actually shoot anything until we know that we would 
> have made enough room.
> If the cluster is over-subscribed and everyone is under their guarantee, and 
> this topology would not put the user over their guarantee.  Shoot the lowest 
> priority topology in this workers resource pool until there is enough room to 
> run the topology or this topology is the one that would be shot.  We might 
> also want to think about what to do if we are going to shoot a production 
> topology in an oversubscribed case, and perhaps we can shoot a non-production 
> topology instead even if the other user is not over their guarantee.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to