Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/921#discussion_r47948394
  
    --- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
 ---
    @@ -28,94 +32,270 @@
     import backtype.storm.scheduler.Topologies;
     import backtype.storm.scheduler.TopologyDetails;
     import backtype.storm.scheduler.WorkerSlot;
    -import backtype.storm.scheduler.resource.strategies.ResourceAwareStrategy;
     
    +import java.util.Arrays;
     import java.util.Collection;
     import java.util.HashMap;
     import java.util.HashSet;
    -import java.util.LinkedList;
     import java.util.Map;
     import java.util.Set;
     
     public class ResourceAwareScheduler implements IScheduler {
    +
    +    private Map<String, User> userMap;
    +    private Cluster cluster;
    +    private Topologies topologies;
    +    private RAS_Nodes nodes;
    +
    +    private class SchedulingState {
    +        private Map<String, User> userMap = new HashMap<String, User>();
    +        private Cluster cluster;
    +        private Topologies topologies;
    +        private RAS_Nodes nodes;
    +        private Map conf = new Config();
    +
    +        public SchedulingState(Map<String, User> userMap, Cluster cluster, 
Topologies topologies, RAS_Nodes nodes, Map conf) {
    +            for (Map.Entry<String, User> userMapEntry : 
userMap.entrySet()) {
    +                String userId = userMapEntry.getKey();
    +                User user = userMapEntry.getValue();
    +                this.userMap.put(userId, user.getCopy());
    +            }
    +            this.cluster = Cluster.getCopy(cluster);
    +            this.topologies = topologies.getCopy(topologies);
    +            this.nodes = new RAS_Nodes(this.cluster, this.topologies);
    +            this.conf.putAll(conf);
    +        }
    +    }
    +
    +    @SuppressWarnings("rawtypes")
    +    private Map conf;
    +
         private static final Logger LOG = LoggerFactory
                 .getLogger(ResourceAwareScheduler.class);
    -    @SuppressWarnings("rawtypes")
    -    private Map _conf;
     
         @Override
         public void prepare(Map conf) {
    -        _conf = conf;
    +        this.conf = conf;
    +
         }
     
         @Override
         public void schedule(Topologies topologies, Cluster cluster) {
             LOG.debug("\n\n\nRerunning ResourceAwareScheduler...");
    +        //initialize data structures
    +        this.initialize(topologies, cluster);
    +        //logs everything that is currently scheduled and the location at 
which they are scheduled
    +        LOG.info("Cluster scheduling:\n{}", 
ResourceUtils.printScheduling(cluster, topologies));
    +        //logs the resources available/used for every node
    +        LOG.info("Nodes:\n{}", this.nodes);
    +        //logs the detailed info about each user
    +        for (User user : this.getUserMap().values()) {
    +            LOG.info(user.getDetailedInfo());
    +        }
     
    -        ResourceAwareStrategy RAStrategy = new 
ResourceAwareStrategy(cluster, topologies);
    -        LOG.debug(printScheduling(cluster, topologies));
    +        ISchedulingPriorityStrategy schedulingPrioritystrategy = null;
    +        while (true) {
     
    -        for (TopologyDetails td : topologies.getTopologies()) {
    -            String topId = td.getId();
    -            Map<WorkerSlot, Collection<ExecutorDetails>> 
schedulerAssignmentMap;
    -            if (cluster.getUnassignedExecutors(td).size() > 0) {
    -                LOG.debug("/********Scheduling topology {} ************/", 
topId);
    -
    -                schedulerAssignmentMap = RAStrategy.schedule(td);
    -
    -                double requestedMemOnHeap = 
td.getTotalRequestedMemOnHeap();
    -                double requestedMemOffHeap = 
td.getTotalRequestedMemOffHeap();
    -                double requestedCpu = td.getTotalRequestedCpu();
    -                double assignedMemOnHeap = 0.0;
    -                double assignedMemOffHeap = 0.0;
    -                double assignedCpu = 0.0;
    -
    -                if (schedulerAssignmentMap != null) {
    -                    try {
    -                        Set<String> nodesUsed = new HashSet<String>();
    -                        int assignedWorkers = 
schedulerAssignmentMap.keySet().size();
    -                        for (Map.Entry<WorkerSlot, 
Collection<ExecutorDetails>> workerToTasksEntry : 
schedulerAssignmentMap.entrySet()) {
    -                            WorkerSlot targetSlot = 
workerToTasksEntry.getKey();
    -                            Collection<ExecutorDetails> 
execsNeedScheduling = workerToTasksEntry.getValue();
    -                            RAS_Node targetNode = 
RAStrategy.idToNode(targetSlot.getNodeId());
    -                            targetNode.assign(targetSlot, td, 
execsNeedScheduling, cluster);
    -                            LOG.debug("ASSIGNMENT    TOPOLOGY: {}  TASKS: 
{} To Node: {} on Slot: {}",
    -                                    td.getName(), execsNeedScheduling, 
targetNode.getHostname(), targetSlot.getPort());
    -                            if (!nodesUsed.contains(targetNode.getId())) {
    -                                nodesUsed.add(targetNode.getId());
    +            if (schedulingPrioritystrategy == null) {
    +                try {
    +                    schedulingPrioritystrategy = 
(ISchedulingPriorityStrategy) Utils.newInstance((String) 
this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
    +                } catch (RuntimeException e) {
    --- End diff --
    
    well this is to prevent the strategy from crashing nimbus potentially 
frequently if there is a bug in the priority strategy.  This way nimbus will be 
still up and users can view their topologies but no scheduling will be done for 
new incoming topologies


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to