Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/921#discussion_r47955652
  
    --- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
 ---
    @@ -128,56 +308,121 @@ private void updateSupervisorsResources(Cluster 
cluster, Topologies topologies)
             cluster.setSupervisorsResourcesMap(supervisors_resources);
         }
     
    -    private Map<String, Double> getUserConf() {
    -        Map<String, Double> ret = new HashMap<String, Double>();
    -        ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
    -                (Double) 
_conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB));
    -        ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB,
    -                (Double) 
_conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB));
    -        return ret;
    +    public User getUser(String user) {
    +        return this.userMap.get(user);
    +    }
    +
    +    public Map<String, User> getUserMap() {
    +        return this.userMap;
         }
     
         /**
    -     * print scheduling for debug purposes
    -     * @param cluster
    +     * Intialize scheduling and running queues
    +     *
          * @param topologies
    +     * @param cluster
          */
    -    public String printScheduling(Cluster cluster, Topologies topologies) {
    -        StringBuilder str = new StringBuilder();
    -        Map<String, Map<String, Map<WorkerSlot, 
Collection<ExecutorDetails>>>> schedulingMap = new HashMap<String, Map<String, 
Map<WorkerSlot, Collection<ExecutorDetails>>>>();
    -        for (TopologyDetails topo : topologies.getTopologies()) {
    -            if (cluster.getAssignmentById(topo.getId()) != null) {
    -                for (Map.Entry<ExecutorDetails, WorkerSlot> entry : 
cluster.getAssignmentById(topo.getId()).getExecutorToSlot().entrySet()) {
    -                    WorkerSlot slot = entry.getValue();
    -                    String nodeId = slot.getNodeId();
    -                    ExecutorDetails exec = entry.getKey();
    -                    if (schedulingMap.containsKey(nodeId) == false) {
    -                        schedulingMap.put(nodeId, new HashMap<String, 
Map<WorkerSlot, Collection<ExecutorDetails>>>());
    -                    }
    -                    if 
(schedulingMap.get(nodeId).containsKey(topo.getId()) == false) {
    -                        schedulingMap.get(nodeId).put(topo.getId(), new 
HashMap<WorkerSlot, Collection<ExecutorDetails>>());
    -                    }
    -                    if 
(schedulingMap.get(nodeId).get(topo.getId()).containsKey(slot) == false) {
    -                        
schedulingMap.get(nodeId).get(topo.getId()).put(slot, new 
LinkedList<ExecutorDetails>());
    -                    }
    -                    
schedulingMap.get(nodeId).get(topo.getId()).get(slot).add(exec);
    +    private void initUsers(Topologies topologies, Cluster cluster) {
    +        this.userMap = new HashMap<String, User>();
    +        Map<String, Map<String, Double>> userResourcePools = 
this.getUserResourcePools();
    +        LOG.debug("userResourcePools: {}", userResourcePools);
    +
    +        for (TopologyDetails td : topologies.getTopologies()) {
    +            //Get user that submitted topology.  If topology submitter is 
null or empty string, the topologySubmitter
    +            //will be set to anonymous
    +            String topologySubmitter = td.getTopologySubmitter();
    +            //additional safety check to make sure that topologySubmitter 
is going to be a valid value
    +            if (topologySubmitter == null || topologySubmitter.equals("")) 
{
    +                LOG.error("Cannot determine user for topology {}.  Will 
skip scheduling this topology", td.getName());
    +                continue;
    +            }
    +            if (!this.userMap.containsKey(topologySubmitter)) {
    +                this.userMap.put(topologySubmitter, new 
User(topologySubmitter, userResourcePools.get(topologySubmitter)));
    +            }
    +            if (cluster.getUnassignedExecutors(td).size() > 0) {
    +                LOG.debug("adding td: {} to pending queue", td.getName());
    +                
this.userMap.get(topologySubmitter).addTopologyToPendingQueue(td);
    +            } else {
    +                LOG.debug("adding td: {} to running queue with existing 
status: {}", td.getName(), cluster.getStatusMap().get(td.getId()));
    +                
this.userMap.get(topologySubmitter).addTopologyToRunningQueue(td);
    +                if (cluster.getStatusMap().get(td.getId()) == null || 
cluster.getStatusMap().get(td.getId()).equals("")) {
    +                    cluster.setStatus(td.getId(), "Fully Scheduled");
                     }
                 }
             }
    +    }
     
    -        for (Map.Entry<String, Map<String, Map<WorkerSlot, 
Collection<ExecutorDetails>>>> entry : schedulingMap.entrySet()) {
    -            if (cluster.getSupervisorById(entry.getKey()) != null) {
    -                str.append("/** Node: " + 
cluster.getSupervisorById(entry.getKey()).getHost() + "-" + entry.getKey() + " 
**/\n");
    -            } else {
    -                str.append("/** Node: Unknown may be dead -" + 
entry.getKey() + " **/\n");
    +    private void initialize(Topologies topologies, Cluster cluster) {
    +        this.cluster = cluster;
    +        this.topologies = topologies;
    +        this.nodes = new RAS_Nodes(this.cluster, this.topologies);
    +        initUsers(topologies, cluster);
    +    }
    +
    +    /**
    +     * Get resource guarantee configs
    +     *
    +     * @return
    +     */
    +    private Map<String, Map<String, Double>> getUserResourcePools() {
    +        Object raw = 
this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
    +        Map<String, Map<String, Double>> ret = new HashMap<String, 
Map<String, Double>>();
    +
    +        if (raw != null) {
    +            for (Map.Entry<String, Map<String, Number>> UserPoolEntry : 
((Map<String, Map<String, Number>>) raw).entrySet()) {
    +                String user = UserPoolEntry.getKey();
    +                ret.put(user, new HashMap<String, Double>());
    +                for (Map.Entry<String, Number> resourceEntry : 
UserPoolEntry.getValue().entrySet()) {
    +                    ret.get(user).put(resourceEntry.getKey(), 
resourceEntry.getValue().doubleValue());
    +                }
                 }
    -            for (Map.Entry<String, Map<WorkerSlot, 
Collection<ExecutorDetails>>> topo_sched : 
schedulingMap.get(entry.getKey()).entrySet()) {
    -                str.append("\t-->Topology: " + topo_sched.getKey() + "\n");
    -                for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> ws 
: topo_sched.getValue().entrySet()) {
    -                    str.append("\t\t->Slot [" + ws.getKey().getPort() + "] 
-> " + ws.getValue() + "\n");
    +        }
    +
    +        Map fromFile = 
Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
    +        Map<String, Map<String, Number>> tmp = (Map<String, Map<String, 
Number>>) fromFile.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
    +        if (tmp != null) {
    +            for (Map.Entry<String, Map<String, Number>> UserPoolEntry : 
tmp.entrySet()) {
    --- End diff --
    
    Let's lower-case to `userPoolEntry` for the loop var.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to