[ 
https://issues.apache.org/jira/browse/STORM-898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15041716#comment-15041716
 ] 

ASF GitHub Bot commented on STORM-898:
--------------------------------------

Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/921#discussion_r46701142
  
    --- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
 ---
    @@ -28,94 +32,272 @@
     import backtype.storm.scheduler.Topologies;
     import backtype.storm.scheduler.TopologyDetails;
     import backtype.storm.scheduler.WorkerSlot;
    -import backtype.storm.scheduler.resource.strategies.ResourceAwareStrategy;
     
    +import java.util.Arrays;
     import java.util.Collection;
     import java.util.HashMap;
     import java.util.HashSet;
    -import java.util.LinkedList;
     import java.util.Map;
     import java.util.Set;
     
     public class ResourceAwareScheduler implements IScheduler {
    +
    +    private Map<String, User> userMap;
    +    private Cluster cluster;
    +    private Topologies topologies;
    +    private RAS_Nodes nodes;
    +
    +    private class SchedulingState {
    +        private Map<String, User> userMap = new HashMap<String, User>();
    +        private Cluster cluster;
    +        private Topologies topologies;
    +        private RAS_Nodes nodes;
    +        private Map conf = new Config();
    +
    +        public SchedulingState(Map<String, User> userMap, Cluster cluster, 
Topologies topologies, RAS_Nodes nodes, Map conf) {
    +            for (Map.Entry<String, User> userMapEntry : 
userMap.entrySet()) {
    +                String userId = userMapEntry.getKey();
    +                User user = userMapEntry.getValue();
    +                this.userMap.put(userId, user.getCopy());
    +            }
    +            this.cluster = cluster.getCopy();
    +            this.topologies = topologies.getCopy();
    +            this.nodes = new RAS_Nodes(this.cluster, this.topologies);
    +            this.conf.putAll(conf);
    +
    +        }
    +    }
    +
    +
    +    @SuppressWarnings("rawtypes")
    +    private Map conf;
    +
         private static final Logger LOG = LoggerFactory
                 .getLogger(ResourceAwareScheduler.class);
    -    @SuppressWarnings("rawtypes")
    -    private Map _conf;
     
         @Override
         public void prepare(Map conf) {
    -        _conf = conf;
    +        this.conf = conf;
    +
         }
     
         @Override
         public void schedule(Topologies topologies, Cluster cluster) {
    -        LOG.debug("\n\n\nRerunning ResourceAwareScheduler...");
    +        LOG.info("\n\n\nRerunning ResourceAwareScheduler...");
    +        //initialize data structures
    +        this.initialize(topologies, cluster);
    +        //logs everything that is currently scheduled and the location at 
which they are scheduled
    +        LOG.info("Cluster scheduling:\n{}", 
ResourceUtils.printScheduling(cluster, topologies));
    +        //logs the resources available/used for every node
    +        LOG.info("Nodes:\n{}", this.nodes);
    +        //logs the detailed info about each user
    +        for (User user : this.getUserMap().values()) {
    +            LOG.info(user.getDetailedInfo());
    +        }
     
    -        ResourceAwareStrategy RAStrategy = new 
ResourceAwareStrategy(cluster, topologies);
    -        LOG.debug(printScheduling(cluster, topologies));
    +        ISchedulingPriorityStrategy schedulingPrioritystrategy = null;
    +        while (true) {
     
    -        for (TopologyDetails td : topologies.getTopologies()) {
    -            String topId = td.getId();
    -            Map<WorkerSlot, Collection<ExecutorDetails>> 
schedulerAssignmentMap;
    -            if (cluster.getUnassignedExecutors(td).size() > 0) {
    -                LOG.debug("/********Scheduling topology {} ************/", 
topId);
    -
    -                schedulerAssignmentMap = RAStrategy.schedule(td);
    -
    -                double requestedMemOnHeap = 
td.getTotalRequestedMemOnHeap();
    -                double requestedMemOffHeap = 
td.getTotalRequestedMemOffHeap();
    -                double requestedCpu = td.getTotalRequestedCpu();
    -                double assignedMemOnHeap = 0.0;
    -                double assignedMemOffHeap = 0.0;
    -                double assignedCpu = 0.0;
    -
    -                if (schedulerAssignmentMap != null) {
    -                    try {
    -                        Set<String> nodesUsed = new HashSet<String>();
    -                        int assignedWorkers = 
schedulerAssignmentMap.keySet().size();
    -                        for (Map.Entry<WorkerSlot, 
Collection<ExecutorDetails>> workerToTasksEntry : 
schedulerAssignmentMap.entrySet()) {
    -                            WorkerSlot targetSlot = 
workerToTasksEntry.getKey();
    -                            Collection<ExecutorDetails> 
execsNeedScheduling = workerToTasksEntry.getValue();
    -                            RAS_Node targetNode = 
RAStrategy.idToNode(targetSlot.getNodeId());
    -                            targetNode.assign(targetSlot, td, 
execsNeedScheduling, cluster);
    -                            LOG.debug("ASSIGNMENT    TOPOLOGY: {}  TASKS: 
{} To Node: {} on Slot: {}",
    -                                    td.getName(), execsNeedScheduling, 
targetNode.getHostname(), targetSlot.getPort());
    -                            if (!nodesUsed.contains(targetNode.getId())) {
    -                                nodesUsed.add(targetNode.getId());
    +            if (schedulingPrioritystrategy == null) {
    +                try {
    +                    schedulingPrioritystrategy = 
(ISchedulingPriorityStrategy) Utils.newInstance((String) 
this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
    +                } catch (RuntimeException e) {
    +                    LOG.error("failed to create instance of priority 
strategy: {} with error: {}! No topologies will be scheduled.",
    +                            
this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY), 
e.getMessage());
    +                    break;
    +                }
    +            }
    +            TopologyDetails td = null;
    +            try {
    +                //need to re prepare since scheduling state might have 
been restored
    +                schedulingPrioritystrategy.prepare(this.topologies, 
this.cluster, this.userMap, this.nodes);
    +                //Call scheduling priority strategy
    +                td = 
schedulingPrioritystrategy.getNextTopologyToSchedule();
    +            } catch (Exception e) {
    +                LOG.error("Exception thrown when running priority strategy 
{}. No topologies will be scheduled! Error: {} StackTrack: {}"
    +                        , schedulingPrioritystrategy.getClass().getName(), 
e.getMessage(), Arrays.toString(e.getStackTrace()));
    +                break;
    +            }
    +            if (td == null) {
    +                break;
    +            }
    +            scheduleTopology(td);
    +        }
    +    }
    +
    +    public void scheduleTopology(TopologyDetails td) {
    +        User topologySubmitter = 
this.userMap.get(td.getTopologySubmitter());
    +        if (cluster.getUnassignedExecutors(td).size() > 0) {
    +            LOG.debug("/********Scheduling topology {} from User 
{}************/", td.getName(), topologySubmitter);
    +
    +            SchedulingState schedulingState = 
this.checkpointSchedulingState();
    +            IStrategy RAStrategy = null;
    +            try {
    +                RAStrategy = (IStrategy) Utils.newInstance((String) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY));
    +            } catch (RuntimeException e) {
    +                LOG.error("failed to create instance of IStrategy: {} with 
error: {}! Topology {} will not be scheduled.",
    +                        td.getName(), 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), e.getMessage());
    +                this.restoreCheckpointSchedulingState(schedulingState);
    +                //since state is restored need the update User 
topologySubmitter to the new User object in userMap
    +                topologySubmitter = 
this.userMap.get(td.getTopologySubmitter());
    +                topologySubmitter.moveTopoFromPendingToInvalid(td);
    +                this.cluster.setStatus(td.getId(), "Unsuccessful in 
scheduling - failed to create instance of topology strategy "
    +                        + 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY) + ". Please check logs for 
details");
    +                return;
    +            }
    +            IEvictionStrategy evictionStrategy = null;
    +            while (true) {
    +                SchedulingResult result = null;
    +                try {
    +                    //Need to re prepare scheduling strategy with cluster 
and topologies in case scheduling state was restored
    +                    RAStrategy.prepare(this.topologies, this.cluster, 
this.userMap, this.nodes);
    +                    result = RAStrategy.schedule(td);
    --- End diff --
    
    well we need to create a IStrategy object each time since topologies can 
have different strategies.  Even multiple topologies use the same strategy, we 
should still need to isolate the scheduling of each topology.  We can cache an 
IStrategy object but we would then need to have a way to reset all the data 
structures in the IStrategy object will may contain a lot of user defined data 
structures thus, I think we should just recreate the IStrategy object


> Add priorities and per user resource guarantees to Resource Aware Scheduler
> ---------------------------------------------------------------------------
>
>                 Key: STORM-898
>                 URL: https://issues.apache.org/jira/browse/STORM-898
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Boyang Jerry Peng
>         Attachments: Resource Aware Scheduler for Storm.pdf
>
>
> In a multi-tenant environment we would like to be able to give individual 
> users a guarantee of how much CPU/Memory/Network they will be able to use in 
> a cluster.  We would also like to know which topologies a user feels are the 
> most important to keep running if there are not enough resources to run all 
> of their topologies.
> Each user should be able to specify if their topology is production, staging, 
> or development. Within each of those categories a user should be able to give 
> a topology a priority, 0 to 10 with 10 being the highest priority (or 
> something like this).
> If there are not enough resources on a cluster to run a topology assume this 
> topology is running using resources and find the user that is most over their 
> guaranteed resources.  Shoot the lowest priority topology for that user, and 
> repeat until, this topology is able to run, or this topology would be the one 
> shot.   Ideally we don't actually shoot anything until we know that we would 
> have made enough room.
> If the cluster is over-subscribed and everyone is under their guarantee, and 
> this topology would not put the user over their guarantee.  Shoot the lowest 
> priority topology in this workers resource pool until there is enough room to 
> run the topology or this topology is the one that would be shot.  We might 
> also want to think about what to do if we are going to shoot a production 
> topology in an oversubscribed case, and perhaps we can shoot a non-production 
> topology instead even if the other user is not over their guarantee.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to