Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2400#discussion_r148850943 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java --- @@ -62,123 +70,144 @@ public void prepare(Map<String, Object> conf) { @Override public void schedule(Topologies topologies, Cluster cluster) { - //initialize data structures - for (TopologyDetails td : cluster.getTopologies()) { + Map<String, User> userMap = getUsers(cluster); + List<TopologyDetails> orderedTopologies = new ArrayList<>(schedulingPriorityStrategy.getOrderedTopologies(cluster, userMap)); + LOG.info("Ordered list of topologies is: {}", orderedTopologies.stream().map((t) -> t.getId()).collect(Collectors.toList())); + for (TopologyDetails td : orderedTopologies) { if (!cluster.needsSchedulingRas(td)) { //cluster forgets about its previous status, so if it is scheduled just leave it. cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled"); - } - } - Map<String, User> userMap = getUsers(cluster); - - while (true) { - TopologyDetails td; - try { - //Call scheduling priority strategy - td = schedulingPrioritystrategy.getNextTopologyToSchedule(cluster, userMap); - } catch (Exception ex) { - LOG.error("Exception thrown when running priority strategy {}. No topologies will be scheduled!", - schedulingPrioritystrategy.getClass().getName(), ex); - break; - } - if (td == null) { - break; - } - User submitter = userMap.get(td.getTopologySubmitter()); - if (cluster.needsSchedulingRas(td)) { - scheduleTopology(td, cluster, submitter, userMap); } else { - LOG.warn("Topology {} is already fully scheduled!", td.getName()); - cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled"); + User submitter = userMap.get(td.getTopologySubmitter()); + scheduleTopology(td, cluster, submitter, orderedTopologies); } } } + private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message) { + markFailedTopology(u, c, td, message, null); + } - public void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter, - Map<String, User> userMap) { + private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message, Throwable t) { + c.setStatus(td, message); + String realMessage = td.getId() + " " + message; + if (t != null) { + LOG.error(realMessage, t); + } else { + LOG.error(realMessage); + } + u.markTopoUnsuccess(td); + } + + private void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter, + List<TopologyDetails> orderedTopologies) { //A copy of cluster that we can modify, but does not get committed back to cluster unless scheduling succeeds Cluster workingState = new Cluster(cluster); + RAS_Nodes nodes = new RAS_Nodes(workingState); IStrategy rasStrategy = null; String strategyConf = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY); try { - rasStrategy = (IStrategy) ReflectionUtils.newSchedulerStrategyInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf); + rasStrategy = ReflectionUtils.newSchedulerStrategyInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf); rasStrategy.prepare(conf); } catch (DisallowedStrategyException e) { - topologySubmitter.markTopoUnsuccess(td); - cluster.setStatus(td.getId(), "Unsuccessful in scheduling - " + e.getAttemptedClass() - + " is not an allowed strategy. Please make sure your " + Config.TOPOLOGY_SCHEDULER_STRATEGY - + " config is one of the allowed strategies: " + e.getAllowedStrategies().toString()); + markFailedTopology(topologySubmitter, cluster, td, + "Unsuccessful in scheduling - " + e.getAttemptedClass() + + " is not an allowed strategy. Please make sure your " + + Config.TOPOLOGY_SCHEDULER_STRATEGY + + " config is one of the allowed strategies: " + + e.getAllowedStrategies(), e); return; } catch (RuntimeException e) { - LOG.error("failed to create instance of IStrategy: {} Topology {} will not be scheduled.", - strategyConf, td.getName(), e); - topologySubmitter.markTopoUnsuccess(td); - cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy " - + strategyConf + ". Please check logs for details"); + markFailedTopology(topologySubmitter, cluster, td, + "Unsuccessful in scheduling - failed to create instance of topology strategy " + + strategyConf + + ". Please check logs for details", e); return; } - while (true) { - // A copy of the cluster that restricts the strategy to only modify a single topology + for (int i = 0; i < maxSchedulingAttempts; i++) { SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId()); - SchedulingResult result = null; try { - result = rasStrategy.schedule(toSchedule, td); - } catch (Exception ex) { - LOG.error("Exception thrown when running strategy {} to schedule topology {}." - + " Topology will not be scheduled!", rasStrategy.getClass().getName(), td.getName(), ex); - topologySubmitter.markTopoUnsuccess(td); - cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}" - + rasStrategy.getClass().getName() + ". Please check logs for details"); - } - LOG.debug("scheduling result: {}", result); - if (result != null) { - if (result.isSuccess()) { - try { + SchedulingResult result = rasStrategy.schedule(toSchedule, td); + LOG.debug("scheduling result: {}", result); + if (result != null) { + if (result.isSuccess()) { cluster.updateFrom(toSchedule); cluster.setStatus(td.getId(), "Running - " + result.getMessage()); - } catch (Exception ex) { - LOG.error("Unsuccessful attempting to assign executors to nodes.", ex); - topologySubmitter.markTopoUnsuccess(td); - cluster.setStatus(td.getId(), "Unsuccessful in scheduling - " - + "IllegalStateException thrown when attempting to assign executors to nodes. Please check" - + " log for details."); - } - return; - } else { - if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) { - boolean madeSpace = false; + //DONE + return; + } else if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) { + LOG.info("Not enough resources to schedule {}", td.getName()); --- End diff -- Should this be debug?
---