Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/921#discussion_r46700830
  
    --- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
 ---
    @@ -28,94 +32,272 @@
     import backtype.storm.scheduler.Topologies;
     import backtype.storm.scheduler.TopologyDetails;
     import backtype.storm.scheduler.WorkerSlot;
    -import backtype.storm.scheduler.resource.strategies.ResourceAwareStrategy;
     
    +import java.util.Arrays;
     import java.util.Collection;
     import java.util.HashMap;
     import java.util.HashSet;
    -import java.util.LinkedList;
     import java.util.Map;
     import java.util.Set;
     
     public class ResourceAwareScheduler implements IScheduler {
    +
    +    private Map<String, User> userMap;
    +    private Cluster cluster;
    +    private Topologies topologies;
    +    private RAS_Nodes nodes;
    +
    +    private class SchedulingState {
    +        private Map<String, User> userMap = new HashMap<String, User>();
    +        private Cluster cluster;
    +        private Topologies topologies;
    +        private RAS_Nodes nodes;
    +        private Map conf = new Config();
    +
    +        public SchedulingState(Map<String, User> userMap, Cluster cluster, 
Topologies topologies, RAS_Nodes nodes, Map conf) {
    +            for (Map.Entry<String, User> userMapEntry : 
userMap.entrySet()) {
    +                String userId = userMapEntry.getKey();
    +                User user = userMapEntry.getValue();
    +                this.userMap.put(userId, user.getCopy());
    +            }
    +            this.cluster = cluster.getCopy();
    +            this.topologies = topologies.getCopy();
    +            this.nodes = new RAS_Nodes(this.cluster, this.topologies);
    +            this.conf.putAll(conf);
    +
    +        }
    +    }
    +
    +
    +    @SuppressWarnings("rawtypes")
    +    private Map conf;
    +
         private static final Logger LOG = LoggerFactory
                 .getLogger(ResourceAwareScheduler.class);
    -    @SuppressWarnings("rawtypes")
    -    private Map _conf;
     
         @Override
         public void prepare(Map conf) {
    -        _conf = conf;
    +        this.conf = conf;
    +
         }
     
         @Override
         public void schedule(Topologies topologies, Cluster cluster) {
    -        LOG.debug("\n\n\nRerunning ResourceAwareScheduler...");
    +        LOG.info("\n\n\nRerunning ResourceAwareScheduler...");
    +        //initialize data structures
    +        this.initialize(topologies, cluster);
    +        //logs everything that is currently scheduled and the location at 
which they are scheduled
    +        LOG.info("Cluster scheduling:\n{}", 
ResourceUtils.printScheduling(cluster, topologies));
    +        //logs the resources available/used for every node
    +        LOG.info("Nodes:\n{}", this.nodes);
    +        //logs the detailed info about each user
    +        for (User user : this.getUserMap().values()) {
    +            LOG.info(user.getDetailedInfo());
    +        }
     
    -        ResourceAwareStrategy RAStrategy = new 
ResourceAwareStrategy(cluster, topologies);
    -        LOG.debug(printScheduling(cluster, topologies));
    +        ISchedulingPriorityStrategy schedulingPrioritystrategy = null;
    +        while (true) {
     
    -        for (TopologyDetails td : topologies.getTopologies()) {
    -            String topId = td.getId();
    -            Map<WorkerSlot, Collection<ExecutorDetails>> 
schedulerAssignmentMap;
    -            if (cluster.getUnassignedExecutors(td).size() > 0) {
    -                LOG.debug("/********Scheduling topology {} ************/", 
topId);
    -
    -                schedulerAssignmentMap = RAStrategy.schedule(td);
    -
    -                double requestedMemOnHeap = 
td.getTotalRequestedMemOnHeap();
    -                double requestedMemOffHeap = 
td.getTotalRequestedMemOffHeap();
    -                double requestedCpu = td.getTotalRequestedCpu();
    -                double assignedMemOnHeap = 0.0;
    -                double assignedMemOffHeap = 0.0;
    -                double assignedCpu = 0.0;
    -
    -                if (schedulerAssignmentMap != null) {
    -                    try {
    -                        Set<String> nodesUsed = new HashSet<String>();
    -                        int assignedWorkers = 
schedulerAssignmentMap.keySet().size();
    -                        for (Map.Entry<WorkerSlot, 
Collection<ExecutorDetails>> workerToTasksEntry : 
schedulerAssignmentMap.entrySet()) {
    -                            WorkerSlot targetSlot = 
workerToTasksEntry.getKey();
    -                            Collection<ExecutorDetails> 
execsNeedScheduling = workerToTasksEntry.getValue();
    -                            RAS_Node targetNode = 
RAStrategy.idToNode(targetSlot.getNodeId());
    -                            targetNode.assign(targetSlot, td, 
execsNeedScheduling, cluster);
    -                            LOG.debug("ASSIGNMENT    TOPOLOGY: {}  TASKS: 
{} To Node: {} on Slot: {}",
    -                                    td.getName(), execsNeedScheduling, 
targetNode.getHostname(), targetSlot.getPort());
    -                            if (!nodesUsed.contains(targetNode.getId())) {
    -                                nodesUsed.add(targetNode.getId());
    +            if (schedulingPrioritystrategy == null) {
    +                try {
    +                    schedulingPrioritystrategy = 
(ISchedulingPriorityStrategy) Utils.newInstance((String) 
this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
    +                } catch (RuntimeException e) {
    +                    LOG.error("failed to create instance of priority 
strategy: {} with error: {}! No topologies will be scheduled.",
    +                            
this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY), 
e.getMessage());
    +                    break;
    +                }
    +            }
    +            TopologyDetails td = null;
    +            try {
    +                //need to re prepare since scheduling state might have 
been restored
    +                schedulingPrioritystrategy.prepare(this.topologies, 
this.cluster, this.userMap, this.nodes);
    +                //Call scheduling priority strategy
    +                td = 
schedulingPrioritystrategy.getNextTopologyToSchedule();
    +            } catch (Exception e) {
    +                LOG.error("Exception thrown when running priority strategy 
{}. No topologies will be scheduled! Error: {} StackTrack: {}"
    +                        , schedulingPrioritystrategy.getClass().getName(), 
e.getMessage(), Arrays.toString(e.getStackTrace()));
    +                break;
    +            }
    +            if (td == null) {
    +                break;
    +            }
    +            scheduleTopology(td);
    +        }
    +    }
    +
    +    public void scheduleTopology(TopologyDetails td) {
    +        User topologySubmitter = 
this.userMap.get(td.getTopologySubmitter());
    +        if (cluster.getUnassignedExecutors(td).size() > 0) {
    +            LOG.debug("/********Scheduling topology {} from User 
{}************/", td.getName(), topologySubmitter);
    +
    +            SchedulingState schedulingState = 
this.checkpointSchedulingState();
    +            IStrategy RAStrategy = null;
    +            try {
    +                RAStrategy = (IStrategy) Utils.newInstance((String) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY));
    +            } catch (RuntimeException e) {
    +                LOG.error("failed to create instance of IStrategy: {} with 
error: {}! Topology {} will not be scheduled.",
    +                        td.getName(), 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), e.getMessage());
    +                this.restoreCheckpointSchedulingState(schedulingState);
    +                //since state is restored need the update User 
topologySubmitter to the new User object in userMap
    +                topologySubmitter = 
this.userMap.get(td.getTopologySubmitter());
    +                topologySubmitter.moveTopoFromPendingToInvalid(td);
    +                this.cluster.setStatus(td.getId(), "Unsuccessful in 
scheduling - failed to create instance of topology strategy "
    +                        + 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY) + ". Please check logs for 
details");
    +                return;
    +            }
    +            IEvictionStrategy evictionStrategy = null;
    +            while (true) {
    +                SchedulingResult result = null;
    +                try {
    +                    //Need to re prepare scheduling strategy with cluster 
and topologies in case scheduling state was restored
    +                    RAStrategy.prepare(this.topologies, this.cluster, 
this.userMap, this.nodes);
    +                    result = RAStrategy.schedule(td);
    +                } catch (Exception e) {
    +                    LOG.error("Exception thrown when running strategy {} 
to schedule topology {}. Topology will not be scheduled! Error: {} StackTrack: 
{}"
    +                            , RAStrategy.getClass().getName(), 
td.getName(), e.getMessage(), Arrays.toString(e.getStackTrace()));
    +                    this.restoreCheckpointSchedulingState(schedulingState);
    +                    //since state is restored need the update User 
topologySubmitter to the new User object in userMap
    +                    topologySubmitter = 
this.userMap.get(td.getTopologySubmitter());
    +                    topologySubmitter.moveTopoFromPendingToInvalid(td);
    +                    this.cluster.setStatus(td.getId(), "Unsuccessful in 
scheduling - Exception thrown when running strategy {}"
    +                            + RAStrategy.getClass().getName() + ". Please 
check logs for details");
    +                }
    +                LOG.debug("scheduling result: {}", result);
    +                if (result != null && result.isValid()) {
    +                    if (result.isSuccess()) {
    +                        try {
    +                            if (mkAssignment(td, 
result.getSchedulingResultMap())) {
    +                                
topologySubmitter.moveTopoFromPendingToRunning(td);
    +                                this.cluster.setStatus(td.getId(), 
"Running - " + result.getMessage());
    +                            } else {
    +                                
this.restoreCheckpointSchedulingState(schedulingState);
    +                                //since state is restored need the update 
User topologySubmitter to the new User object in userMap
    +                                topologySubmitter = 
this.userMap.get(td.getTopologySubmitter());
    +                                
topologySubmitter.moveTopoFromPendingToAttempted(td);
    +                                this.cluster.setStatus(td.getId(), 
"Unsuccessful in scheduling - Unable to assign executors to nodes. Please check 
logs for details");
    +                            }
    +                        } catch (IllegalStateException ex) {
    +                            LOG.error(ex.toString());
    +                            LOG.error("Unsuccessful in scheduling - 
IllegalStateException thrown when attempting to assign executors to nodes. 
Error: {} StackTrace: {}",
    +                                    ex.getClass().getName(), 
Arrays.toString(ex.getStackTrace()));
    +                            
this.restoreCheckpointSchedulingState(schedulingState);
    +                            //since state is restored need the update User 
topologySubmitter to the new User object in userMap
    +                            topologySubmitter = 
this.userMap.get(td.getTopologySubmitter());
    +                            
topologySubmitter.moveTopoFromPendingToAttempted(td);
    +                            this.cluster.setStatus(td.getId(), 
"Unsuccessful in scheduling - IllegalStateException thrown when attempting to 
assign executors to nodes. Please check log for details.");
    +                        }
    +                        break;
    +                    } else {
    +                        if (result.getStatus() == 
SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
    +                            if (evictionStrategy == null) {
    +                                try {
    +                                    evictionStrategy = (IEvictionStrategy) 
Utils.newInstance((String) 
this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY));
    +                                } catch (RuntimeException e) {
    +                                    LOG.error("failed to create instance 
of eviction strategy: {} with error: {}! No topology eviction will be done.",
    +                                            
this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY), 
e.getMessage());
    +                                    
topologySubmitter.moveTopoFromPendingToAttempted(td);
    +                                    break;
    +                                }
    +                            }
    +                            boolean madeSpace = false;
    +                            try {
    +                                //need to re prepare since scheduling 
state might have been restored
    +                                evictionStrategy.prepare(this.topologies, 
this.cluster, this.userMap, this.nodes);
    +                                madeSpace = 
evictionStrategy.makeSpaceForTopo(td);
    +                            } catch (Exception e) {
    +                                LOG.error("Exception thrown when running 
eviction strategy {} to schedule topology {}. No evictions will be done! Error: 
{} StackTrack: {}"
    +                                        , 
evictionStrategy.getClass().getName(), td.getName(), e.getClass().getName(), 
Arrays.toString(e.getStackTrace()));
    +                                
this.restoreCheckpointSchedulingState(schedulingState);
    +                                //since state is restored need the update 
User topologySubmitter to the new User object in userMap
    +                                topologySubmitter = 
this.userMap.get(td.getTopologySubmitter());
    +                                
topologySubmitter.moveTopoFromPendingToAttempted(td);
    +                                break;
    +                            }
    +                            if (!madeSpace) {
    +                                LOG.debug("Could not make space for topo 
{} will move to attempted", td);
    +                                
this.restoreCheckpointSchedulingState(schedulingState);
    +                                //since state is restored need the update 
User topologySubmitter to the new User object in userMap
    +                                topologySubmitter = 
this.userMap.get(td.getTopologySubmitter());
    +                                
topologySubmitter.moveTopoFromPendingToAttempted(td);
    +                                this.cluster.setStatus(td.getId(), "Not 
enough resources to schedule - " + result.getErrorMessage());
    +                                break;
                                 }
    -                            assignedMemOnHeap += 
targetSlot.getAllocatedMemOnHeap();
    -                            assignedMemOffHeap += 
targetSlot.getAllocatedMemOffHeap();
    -                            assignedCpu += targetSlot.getAllocatedCpu();
    +                            continue;
    +                        } else if (result.getStatus() == 
SchedulingStatus.FAIL_INVALID_TOPOLOGY) {
    +                            
this.restoreCheckpointSchedulingState(schedulingState);
    +                            //since state is restored need the update User 
topologySubmitter to the new User object in userMap
    +                            topologySubmitter = 
this.userMap.get(td.getTopologySubmitter());
    +                            
topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
    +                            break;
    +                        } else {
    +                            
this.restoreCheckpointSchedulingState(schedulingState);
    +                            //since state is restored need the update User 
topologySubmitter to the new User object in userMap
    +                            topologySubmitter = 
this.userMap.get(td.getTopologySubmitter());
    +                            
topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
    +                            break;
                             }
    -                        LOG.debug("Topology: {} assigned to {} nodes on {} 
workers", td.getId(), nodesUsed.size(), assignedWorkers);
    -                        cluster.setStatus(td.getId(), "Fully Scheduled");
    -                    } catch (IllegalStateException ex) {
    -                        LOG.error(ex.toString());
    -                        LOG.error("Unsuccessful in scheduling", 
td.getId());
    -                        cluster.setStatus(td.getId(), "Unsuccessful in 
scheduling");
                         }
                     } else {
    -                    LOG.error("Unsuccessful in scheduling", td.getId());
    -                    cluster.setStatus(td.getId(), "Unsuccessful in 
scheduling");
    +                    LOG.warn("Scheduling results returned from topology {} 
is not vaild! Topology with be ignored.", td.getName());
    +                    this.restoreCheckpointSchedulingState(schedulingState);
    +                    //since state is restored need the update User 
topologySubmitter to the new User object in userMap
    +                    topologySubmitter = 
this.userMap.get(td.getTopologySubmitter());
    +                    topologySubmitter.moveTopoFromPendingToInvalid(td, 
this.cluster);
    +                    break;
    --- End diff --
    
    Do we want to update the status of the topology here too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to