[ 
https://issues.apache.org/jira/browse/STORM-898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15063387#comment-15063387
 ] 

ASF GitHub Bot commented on STORM-898:
--------------------------------------

Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/921#discussion_r47992667
  
    --- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
 ---
    @@ -128,56 +308,121 @@ private void updateSupervisorsResources(Cluster 
cluster, Topologies topologies)
             cluster.setSupervisorsResourcesMap(supervisors_resources);
         }
     
    -    private Map<String, Double> getUserConf() {
    -        Map<String, Double> ret = new HashMap<String, Double>();
    -        ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
    -                (Double) 
_conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB));
    -        ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB,
    -                (Double) 
_conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB));
    -        return ret;
    +    public User getUser(String user) {
    +        return this.userMap.get(user);
    +    }
    +
    +    public Map<String, User> getUserMap() {
    +        return this.userMap;
         }
     
         /**
    -     * print scheduling for debug purposes
    -     * @param cluster
    +     * Intialize scheduling and running queues
    +     *
          * @param topologies
    +     * @param cluster
          */
    -    public String printScheduling(Cluster cluster, Topologies topologies) {
    -        StringBuilder str = new StringBuilder();
    -        Map<String, Map<String, Map<WorkerSlot, 
Collection<ExecutorDetails>>>> schedulingMap = new HashMap<String, Map<String, 
Map<WorkerSlot, Collection<ExecutorDetails>>>>();
    -        for (TopologyDetails topo : topologies.getTopologies()) {
    -            if (cluster.getAssignmentById(topo.getId()) != null) {
    -                for (Map.Entry<ExecutorDetails, WorkerSlot> entry : 
cluster.getAssignmentById(topo.getId()).getExecutorToSlot().entrySet()) {
    -                    WorkerSlot slot = entry.getValue();
    -                    String nodeId = slot.getNodeId();
    -                    ExecutorDetails exec = entry.getKey();
    -                    if (schedulingMap.containsKey(nodeId) == false) {
    -                        schedulingMap.put(nodeId, new HashMap<String, 
Map<WorkerSlot, Collection<ExecutorDetails>>>());
    -                    }
    -                    if 
(schedulingMap.get(nodeId).containsKey(topo.getId()) == false) {
    -                        schedulingMap.get(nodeId).put(topo.getId(), new 
HashMap<WorkerSlot, Collection<ExecutorDetails>>());
    -                    }
    -                    if 
(schedulingMap.get(nodeId).get(topo.getId()).containsKey(slot) == false) {
    -                        
schedulingMap.get(nodeId).get(topo.getId()).put(slot, new 
LinkedList<ExecutorDetails>());
    -                    }
    -                    
schedulingMap.get(nodeId).get(topo.getId()).get(slot).add(exec);
    +    private void initUsers(Topologies topologies, Cluster cluster) {
    +        this.userMap = new HashMap<String, User>();
    +        Map<String, Map<String, Double>> userResourcePools = 
this.getUserResourcePools();
    +        LOG.debug("userResourcePools: {}", userResourcePools);
    +
    +        for (TopologyDetails td : topologies.getTopologies()) {
    +            //Get user that submitted topology.  If topology submitter is 
null or empty string, the topologySubmitter
    +            //will be set to anonymous
    +            String topologySubmitter = td.getTopologySubmitter();
    +            //additional safety check to make sure that topologySubmitter 
is going to be a valid value
    +            if (topologySubmitter == null || topologySubmitter.equals("")) 
{
    +                LOG.error("Cannot determine user for topology {}.  Will 
skip scheduling this topology", td.getName());
    +                continue;
    +            }
    +            if (!this.userMap.containsKey(topologySubmitter)) {
    +                this.userMap.put(topologySubmitter, new 
User(topologySubmitter, userResourcePools.get(topologySubmitter)));
    +            }
    +            if (cluster.getUnassignedExecutors(td).size() > 0) {
    +                LOG.debug("adding td: {} to pending queue", td.getName());
    +                
this.userMap.get(topologySubmitter).addTopologyToPendingQueue(td);
    +            } else {
    +                LOG.debug("adding td: {} to running queue with existing 
status: {}", td.getName(), cluster.getStatusMap().get(td.getId()));
    +                
this.userMap.get(topologySubmitter).addTopologyToRunningQueue(td);
    +                if (cluster.getStatusMap().get(td.getId()) == null || 
cluster.getStatusMap().get(td.getId()).equals("")) {
    +                    cluster.setStatus(td.getId(), "Fully Scheduled");
                     }
                 }
             }
    +    }
     
    -        for (Map.Entry<String, Map<String, Map<WorkerSlot, 
Collection<ExecutorDetails>>>> entry : schedulingMap.entrySet()) {
    -            if (cluster.getSupervisorById(entry.getKey()) != null) {
    -                str.append("/** Node: " + 
cluster.getSupervisorById(entry.getKey()).getHost() + "-" + entry.getKey() + " 
**/\n");
    -            } else {
    -                str.append("/** Node: Unknown may be dead -" + 
entry.getKey() + " **/\n");
    +    private void initialize(Topologies topologies, Cluster cluster) {
    +        this.cluster = cluster;
    +        this.topologies = topologies;
    +        this.nodes = new RAS_Nodes(this.cluster, this.topologies);
    +        initUsers(topologies, cluster);
    +    }
    +
    +    /**
    +     * Get resource guarantee configs
    +     *
    +     * @return
    +     */
    +    private Map<String, Map<String, Double>> getUserResourcePools() {
    +        Object raw = 
this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
    +        Map<String, Map<String, Double>> ret = new HashMap<String, 
Map<String, Double>>();
    +
    +        if (raw != null) {
    +            for (Map.Entry<String, Map<String, Number>> UserPoolEntry : 
((Map<String, Map<String, Number>>) raw).entrySet()) {
    +                String user = UserPoolEntry.getKey();
    +                ret.put(user, new HashMap<String, Double>());
    +                for (Map.Entry<String, Number> resourceEntry : 
UserPoolEntry.getValue().entrySet()) {
    +                    ret.get(user).put(resourceEntry.getKey(), 
resourceEntry.getValue().doubleValue());
    +                }
                 }
    -            for (Map.Entry<String, Map<WorkerSlot, 
Collection<ExecutorDetails>>> topo_sched : 
schedulingMap.get(entry.getKey()).entrySet()) {
    -                str.append("\t-->Topology: " + topo_sched.getKey() + "\n");
    -                for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> ws 
: topo_sched.getValue().entrySet()) {
    -                    str.append("\t\t->Slot [" + ws.getKey().getPort() + "] 
-> " + ws.getValue() + "\n");
    +        }
    +
    +        Map fromFile = 
Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
    +        Map<String, Map<String, Number>> tmp = (Map<String, Map<String, 
Number>>) fromFile.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
    +        if (tmp != null) {
    +            for (Map.Entry<String, Map<String, Number>> UserPoolEntry : 
tmp.entrySet()) {
    +                String user = UserPoolEntry.getKey();
    +                ret.put(user, new HashMap<String, Double>());
    +                for (Map.Entry<String, Number> resourceEntry : 
UserPoolEntry.getValue().entrySet()) {
    +                    ret.get(user).put(resourceEntry.getKey(), 
resourceEntry.getValue().doubleValue());
                     }
                 }
             }
    -        return str.toString();
    +        return ret;
    +    }
    +
    +    private SchedulingState checkpointSchedulingState() {
    +        LOG.debug("/*********Checkpoint scheduling state************/");
    +        for (User user : this.getUserMap().values()) {
    +            LOG.debug(user.getDetailedInfo());
    +        }
    +        LOG.debug(ResourceUtils.printScheduling(this.cluster, 
this.topologies));
    +        LOG.debug("nodes:\n{}", this.nodes);
    +        LOG.debug("/*********End************/");
    +        return new SchedulingState(this.userMap, this.cluster, 
this.topologies, this.nodes, this.conf);
    +    }
    --- End diff --
    
    well I wanted to call one function for check pointing the scheduler state, 
thus if we decide to checkpoint using another mechanism the API will still be 
the same and we just need to change the implementation of this one function


> Add priorities and per user resource guarantees to Resource Aware Scheduler
> ---------------------------------------------------------------------------
>
>                 Key: STORM-898
>                 URL: https://issues.apache.org/jira/browse/STORM-898
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Boyang Jerry Peng
>         Attachments: Resource Aware Scheduler for Storm.pdf
>
>
> In a multi-tenant environment we would like to be able to give individual 
> users a guarantee of how much CPU/Memory/Network they will be able to use in 
> a cluster.  We would also like to know which topologies a user feels are the 
> most important to keep running if there are not enough resources to run all 
> of their topologies.
> Each user should be able to specify if their topology is production, staging, 
> or development. Within each of those categories a user should be able to give 
> a topology a priority, 0 to 10 with 10 being the highest priority (or 
> something like this).
> If there are not enough resources on a cluster to run a topology assume this 
> topology is running using resources and find the user that is most over their 
> guaranteed resources.  Shoot the lowest priority topology for that user, and 
> repeat until, this topology is able to run, or this topology would be the one 
> shot.   Ideally we don't actually shoot anything until we know that we would 
> have made enough room.
> If the cluster is over-subscribed and everyone is under their guarantee, and 
> this topology would not put the user over their guarantee.  Shoot the lowest 
> priority topology in this workers resource pool until there is enough room to 
> run the topology or this topology is the one that would be shot.  We might 
> also want to think about what to do if we are going to shoot a production 
> topology in an oversubscribed case, and perhaps we can shoot a non-production 
> topology instead even if the other user is not over their guarantee.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to