Github user agresch commented on a diff in the pull request:
https://github.com/apache/storm/pull/2630#discussion_r180801771
--- Diff:
storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
---
@@ -100,49 +180,88 @@ private static void markFailedTopology(User u,
Cluster c, TopologyDetails td, St
u.markTopoUnsuccess(td);
}
+ private void cancelAllPendingClusterStateChanged() {
+ if (!schedulingInBackground.isEmpty()) {
+ LOG.warn("Canceling scheduling of {} cluster state changed",
schedulingInBackground.keySet());
+ for (SchedulingPending sp : schedulingInBackground.values()) {
+ sp.cancel();
+ }
+ schedulingInBackground.clear();
+ }
+ }
+
private void scheduleTopology(TopologyDetails td, Cluster cluster,
final User topologySubmitter,
List<TopologyDetails> orderedTopologies)
{
+ LOG.debug("Scheduling {}", td.getId());
+ SchedulingPending sp = schedulingInBackground.get(td.getId());
+ if (sp == null) {
+ IStrategy rasStrategy;
+ String strategyConf = (String)
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
+ try {
+ String strategy = strategyConf;
+ if (strategy.startsWith("backtype.storm")) {
+ // Storm supports to launch workers of older version.
+ // If the config of TOPOLOGY_SCHEDULER_STRATEGY comes
from the older version, replace the package name.
+ strategy = strategy.replace("backtype.storm",
"org.apache.storm");
+ LOG.debug("Replace backtype.storm with
org.apache.storm for Config.TOPOLOGY_SCHEDULER_STRATEGY");
+ }
+ rasStrategy =
ReflectionUtils.newSchedulerStrategyInstance(strategy, conf);
+ rasStrategy.prepare(conf);
+ } catch (DisallowedStrategyException e) {
+ markFailedTopology(topologySubmitter, cluster, td,
+ "Unsuccessful in scheduling - " + e.getAttemptedClass()
+ + " is not an allowed strategy. Please make sure
your "
+ + Config.TOPOLOGY_SCHEDULER_STRATEGY
+ + " config is one of the allowed strategies: "
+ + e.getAllowedStrategies(), e);
+ return;
+ } catch (RuntimeException e) {
+ markFailedTopology(topologySubmitter, cluster, td,
+ "Unsuccessful in scheduling - failed to create
instance of topology strategy "
+ + strategyConf
+ + ". Please check logs for details", e);
+ return;
+ }
+
+ sp = new SchedulingPending(rasStrategy, 0);
+ }
+
//A copy of cluster that we can modify, but does not get committed
back to cluster unless scheduling succeeds
Cluster workingState = new Cluster(cluster);
RAS_Nodes nodes = new RAS_Nodes(workingState);
- IStrategy rasStrategy = null;
- String strategyConf = (String)
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
- try {
- String strategy = (String)
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
- if (strategy.startsWith("backtype.storm")) {
- // Storm supports to launch workers of older version.
- // If the config of TOPOLOGY_SCHEDULER_STRATEGY comes from
the older version, replace the package name.
- strategy = strategy.replace("backtype.storm",
"org.apache.storm");
- LOG.debug("Replace backtype.storm with org.apache.storm
for Config.TOPOLOGY_SCHEDULER_STRATEGY");
- }
- rasStrategy =
ReflectionUtils.newSchedulerStrategyInstance(strategy, conf);
- rasStrategy.prepare(conf);
- } catch (DisallowedStrategyException e) {
- markFailedTopology(topologySubmitter, cluster, td,
- "Unsuccessful in scheduling - " + e.getAttemptedClass()
- + " is not an allowed strategy. Please make sure your "
- + Config.TOPOLOGY_SCHEDULER_STRATEGY
- + " config is one of the allowed strategies: "
- + e.getAllowedStrategies(), e);
- return;
- } catch (RuntimeException e) {
- markFailedTopology(topologySubmitter, cluster, td,
- "Unsuccessful in scheduling - failed to create instance of
topology strategy "
- + strategyConf
- + ". Please check logs for details", e);
- return;
- }
- for (int i = 0; i < maxSchedulingAttempts; i++) {
+ for (int i = sp.getAttempt(); i < maxSchedulingAttempts; i++) {
SingleTopologyCluster toSchedule = new
SingleTopologyCluster(workingState, td.getId());
try {
- SchedulingResult result = rasStrategy.schedule(toSchedule,
td);
+ SchedulingResult result;
+ Future<SchedulingResult> schedulingFuture =
sp.scheduleIfNeeded(background, toSchedule, td);
+ try {
+ result =
schedulingFuture.get(schedulingForegroundTimeoutSeconds, TimeUnit.SECONDS);
+ sp.resetFuture();
+ } catch (TimeoutException te) {
+ long elapsedTimeSecs = (Time.currentTimeMillis() -
sp.getStartTime())/1000;
+ if (elapsedTimeSecs >=
schedulingBackgroundTimeoutSeconds) {
--- End diff --
what would be wrong with just letting this remain running?
---