Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2400#discussion_r148852799
--- Diff:
storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
---
@@ -62,123 +70,144 @@ public void prepare(Map<String, Object> conf) {
@Override
public void schedule(Topologies topologies, Cluster cluster) {
- //initialize data structures
- for (TopologyDetails td : cluster.getTopologies()) {
+ Map<String, User> userMap = getUsers(cluster);
+ List<TopologyDetails> orderedTopologies = new
ArrayList<>(schedulingPriorityStrategy.getOrderedTopologies(cluster, userMap));
+ LOG.info("Ordered list of topologies is: {}",
orderedTopologies.stream().map((t) -> t.getId()).collect(Collectors.toList()));
+ for (TopologyDetails td : orderedTopologies) {
if (!cluster.needsSchedulingRas(td)) {
//cluster forgets about its previous status, so if it is
scheduled just leave it.
cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
- }
- }
- Map<String, User> userMap = getUsers(cluster);
-
- while (true) {
- TopologyDetails td;
- try {
- //Call scheduling priority strategy
- td =
schedulingPrioritystrategy.getNextTopologyToSchedule(cluster, userMap);
- } catch (Exception ex) {
- LOG.error("Exception thrown when running priority strategy
{}. No topologies will be scheduled!",
- schedulingPrioritystrategy.getClass().getName(),
ex);
- break;
- }
- if (td == null) {
- break;
- }
- User submitter = userMap.get(td.getTopologySubmitter());
- if (cluster.needsSchedulingRas(td)) {
- scheduleTopology(td, cluster, submitter, userMap);
} else {
- LOG.warn("Topology {} is already fully scheduled!",
td.getName());
- cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
+ User submitter = userMap.get(td.getTopologySubmitter());
+ scheduleTopology(td, cluster, submitter,
orderedTopologies);
}
}
}
+ private static void markFailedTopology(User u, Cluster c,
TopologyDetails td, String message) {
+ markFailedTopology(u, c, td, message, null);
+ }
- public void scheduleTopology(TopologyDetails td, Cluster cluster,
final User topologySubmitter,
- Map<String, User> userMap) {
+ private static void markFailedTopology(User u, Cluster c,
TopologyDetails td, String message, Throwable t) {
+ c.setStatus(td, message);
+ String realMessage = td.getId() + " " + message;
+ if (t != null) {
+ LOG.error(realMessage, t);
+ } else {
+ LOG.error(realMessage);
+ }
+ u.markTopoUnsuccess(td);
+ }
+
+ private void scheduleTopology(TopologyDetails td, Cluster cluster,
final User topologySubmitter,
+ List<TopologyDetails> orderedTopologies)
{
//A copy of cluster that we can modify, but does not get committed
back to cluster unless scheduling succeeds
Cluster workingState = new Cluster(cluster);
+ RAS_Nodes nodes = new RAS_Nodes(workingState);
IStrategy rasStrategy = null;
String strategyConf = (String)
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
try {
- rasStrategy = (IStrategy)
ReflectionUtils.newSchedulerStrategyInstance((String)
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf);
+ rasStrategy =
ReflectionUtils.newSchedulerStrategyInstance((String)
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf);
rasStrategy.prepare(conf);
} catch (DisallowedStrategyException e) {
- topologySubmitter.markTopoUnsuccess(td);
- cluster.setStatus(td.getId(), "Unsuccessful in scheduling - "
+ e.getAttemptedClass()
- + " is not an allowed strategy. Please make sure your
" + Config.TOPOLOGY_SCHEDULER_STRATEGY
- + " config is one of the allowed strategies: " +
e.getAllowedStrategies().toString());
+ markFailedTopology(topologySubmitter, cluster, td,
+ "Unsuccessful in scheduling - " + e.getAttemptedClass()
+ + " is not an allowed strategy. Please make sure your "
+ + Config.TOPOLOGY_SCHEDULER_STRATEGY
+ + " config is one of the allowed strategies: "
+ + e.getAllowedStrategies(), e);
return;
} catch (RuntimeException e) {
- LOG.error("failed to create instance of IStrategy: {} Topology
{} will not be scheduled.",
- strategyConf, td.getName(), e);
- topologySubmitter.markTopoUnsuccess(td);
- cluster.setStatus(td.getId(), "Unsuccessful in scheduling -
failed to create instance of topology strategy "
- + strategyConf + ". Please check logs for details");
+ markFailedTopology(topologySubmitter, cluster, td,
+ "Unsuccessful in scheduling - failed to create instance of
topology strategy "
+ + strategyConf
+ + ". Please check logs for details", e);
return;
}
- while (true) {
- // A copy of the cluster that restricts the strategy to only
modify a single topology
+ for (int i = 0; i < maxSchedulingAttempts; i++) {
SingleTopologyCluster toSchedule = new
SingleTopologyCluster(workingState, td.getId());
- SchedulingResult result = null;
try {
- result = rasStrategy.schedule(toSchedule, td);
- } catch (Exception ex) {
- LOG.error("Exception thrown when running strategy {} to
schedule topology {}."
- + " Topology will not be scheduled!",
rasStrategy.getClass().getName(), td.getName(), ex);
- topologySubmitter.markTopoUnsuccess(td);
- cluster.setStatus(td.getId(), "Unsuccessful in scheduling
- Exception thrown when running strategy {}"
- + rasStrategy.getClass().getName() + ". Please
check logs for details");
- }
- LOG.debug("scheduling result: {}", result);
- if (result != null) {
- if (result.isSuccess()) {
- try {
+ SchedulingResult result = rasStrategy.schedule(toSchedule,
td);
+ LOG.debug("scheduling result: {}", result);
+ if (result != null) {
+ if (result.isSuccess()) {
cluster.updateFrom(toSchedule);
cluster.setStatus(td.getId(), "Running - " +
result.getMessage());
- } catch (Exception ex) {
- LOG.error("Unsuccessful attempting to assign
executors to nodes.", ex);
- topologySubmitter.markTopoUnsuccess(td);
- cluster.setStatus(td.getId(), "Unsuccessful in
scheduling - "
- + "IllegalStateException thrown when
attempting to assign executors to nodes. Please check"
- + " log for details.");
- }
- return;
- } else {
- if (result.getStatus() ==
SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
- boolean madeSpace = false;
+ //DONE
+ return;
+ } else if (result.getStatus() ==
SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
+ LOG.info("Not enough resources to schedule {}",
td.getName());
+ List<TopologyDetails> reversedList =
ImmutableList.copyOf(orderedTopologies).reverse();
try {
- //need to re prepare since scheduling state
might have been restored
- madeSpace =
evictionStrategy.makeSpaceForTopo(td, workingState, userMap);
+ boolean evictedSomething = false;
+ LOG.debug("attempting to make space for topo
{} from user {}", td.getName(), td.getTopologySubmitter());
+ int tdIndex = reversedList.indexOf(td);
+ double cpuNeeded = td.getTotalRequestedCpu();
+ double memoryNeeded =
td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap();
+ SchedulerAssignment assignment =
cluster.getAssignmentById(td.getId());
+ if (assignment != null) {
+ cpuNeeded -= getCpuUsed(assignment);
+ memoryNeeded -= getMemoryUsed(assignment);
+ }
+ cluster.getTopologyResourcesMap();
+ for (int index = 0; index < tdIndex; index++) {
+ TopologyDetails topologyEvict =
reversedList.get(index);
+ SchedulerAssignment evictAssignemnt =
workingState.getAssignmentById(topologyEvict.getId());
+ if (evictAssignemnt != null &&
!evictAssignemnt.getSlots().isEmpty()) {
+ Collection<WorkerSlot> workersToEvict
= workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
+
+ LOG.debug("Evicting Topology {} with
workers: {} from user {}", topologyEvict.getName(), workersToEvict,
+
topologyEvict.getTopologySubmitter());
+ cpuNeeded -=
getCpuUsed(evictAssignemnt);
+ memoryNeeded -=
getMemoryUsed(evictAssignemnt);
+ evictedSomething = true;
+ nodes.freeSlots(workersToEvict);
+ if (cpuNeeded <= 0 && memoryNeeded <=
0) {
+ //We evicted enough topologies to
have a hope of scheduling, so try it now, and don't evict more
+ // than is needed
+ break;
+ }
+ }
+ }
+
+ if (!evictedSomething) {
+ markFailedTopology(topologySubmitter,
cluster, td,
+ "Not enough resources to schedule - "
+ result.getErrorMessage());
+ return;
+ }
} catch (Exception ex) {
- LOG.error("Exception thrown when running
eviction strategy {} to schedule topology {}."
- + " No evictions will be
done!", evictionStrategy.getClass().getName(),
- td.getName(), ex);
- topologySubmitter.markTopoUnsuccess(td);
- return;
- }
- if (!madeSpace) {
- LOG.debug("Could not make space for topo {}
will move to attempted", td);
- topologySubmitter.markTopoUnsuccess(td);
- cluster.setStatus(td.getId(), "Not enough
resources to schedule - "
- + result.getErrorMessage());
- return;
+ LOG.error("Exception thrown when running
eviction to schedule topology {}."
+ + " No evictions will be done! Error:
{}",
+ td.getName(), ex.getClass().getName(), ex);
}
+ //Only place we fall though to do the loop over
again...
continue;
} else {
--- End diff --
Should comment that this is for all other failed status
---