bipinprasad commented on a change in pull request #3136: [STORM-3519] Change
ConstraintSolverStrategy::backtrackSearch to iteration
URL: https://github.com/apache/storm/pull/3136#discussion_r334567915
##########
File path:
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
##########
@@ -320,51 +320,101 @@ private boolean checkSchedulingFeasibility(int
maxStateSearch) {
return
GenericResourceAwareStrategy.sortObjectResourcesImpl(allResources, exec,
topologyDetails, existingScheduleFunc);
}
- // Backtracking algorithm does not take into account the ordering of
executors in worker to reduce traversal space
+ /**
+ * Try to schedule till successful or till limits (backtrack count or
time) have been exceeded.
+ *
+ * @param state terminal state of the executor assignment.
+ * @return SolverResult with success attribute set to true or false
indicting whether ALL executors were assigned.
+ */
@VisibleForTesting
protected SolverResult backtrackSearch(SearcherState state) {
- state.incStatesSearched();
- if (state.areSearchLimitsExceeded()) {
- LOG.warn("Limits Exceeded");
- return new SolverResult(state, false);
- }
+ long startTimeMilli = System.currentTimeMillis();
+ int maxExecCnt = state.getExecSize();
+
+ // following three are state information at each "execIndex" level
+ int[] progressIdxForExec = new int[maxExecCnt];
+ RasNode[] nodeForExec = new RasNode[maxExecCnt];
+ WorkerSlot[] workerSlotForExec = new WorkerSlot[maxExecCnt];
- if (Thread.currentThread().isInterrupted()) {
- return new SolverResult(state, false);
+ for (int i = 0; i < maxExecCnt ; i++) {
+ progressIdxForExec[i] = -1;
}
+ LOG.info("backtrackSearch: will assign {} executors", maxExecCnt);
+
+ OUTERMOST_LOOP:
+ for (int loopCnt = 0 ; true ; loopCnt++) {
+ LOG.debug("backtrackSearch: loopCnt = {}, state.execIndex = {}",
loopCnt, state.execIndex);
+ if (state.areSearchLimitsExceeded()) {
+ LOG.warn("backtrackSearch: Search limits exceeded");
+ return new SolverResult(state, false);
+ }
- ExecutorDetails exec = state.currentExec();
- Iterable<String> sortedNodes = sortAllNodes(state.td, exec,
favoredNodeIds, unFavoredNodeIds);
+ if (Thread.currentThread().isInterrupted()) {
+ return new SolverResult(state, false);
+ }
- for (String nodeId: sortedNodes) {
- RasNode node = nodes.get(nodeId);
- for (WorkerSlot workerSlot : node.getSlotsAvailableToScheduleOn())
{
- if (isExecAssignmentToWorkerValid(workerSlot, state)) {
- state.tryToSchedule(execToComp, node, workerSlot);
+ int execIndex = state.execIndex;
- if (state.areAllExecsScheduled()) {
- //Everything is scheduled correctly, so no need to
search any more.
- return new SolverResult(state, true);
- }
+ ExecutorDetails exec = state.currentExec();
+ Iterable<String> sortedNodesIter = sortAllNodes(state.td, exec,
favoredNodeIds, unFavoredNodeIds);
- SolverResult results =
backtrackSearch(state.nextExecutor());
- if (results.success) {
- //We found a good result we are done.
- return results;
+ int progressIdx = -1;
+ boolean assigned = false;
+ for (String nodeId : sortedNodesIter) {
+ RasNode node = nodes.get(nodeId);
+ for (WorkerSlot workerSlot :
node.getSlotsAvailableToScheduleOn()) {
+ progressIdx++;
+ if (progressIdx <= progressIdxForExec[execIndex]) {
+ continue;
}
+ progressIdxForExec[execIndex]++;
+ LOG.debug("backtrackSearch: loopCnt = {}, state.execIndex
= {}, node/slot-ordinal = {}, nodeId = {}",
+ loopCnt, execIndex, progressIdx, nodeId);
- if (state.areSearchLimitsExceeded()) {
- //No need to search more it is not going to help.
- return new SolverResult(state, false);
+ if (!isExecAssignmentToWorkerValid(workerSlot, state)) {
+ continue;
}
- //backtracking (If we ever get here there really isn't a
lot of hope that we will find a scheduling)
- state.backtrack(execToComp, node, workerSlot);
+ try {
+ state.incStatesSearched();
+ state.tryToSchedule(execToComp, node, workerSlot);
+ if (state.areAllExecsScheduled()) {
+ //Everything is scheduled correctly, so no need to
search any more.
+ LOG.info("backtrackSearch: AllExecsScheduled at
loopCnt = {} in {} milliseconds, elapsedtime in state={}",
+ loopCnt, System.currentTimeMillis() -
startTimeMilli, Time.currentTimeMillis() - state.startTimeMillis);
+ return new SolverResult(state, true);
+ }
+ state = state.nextExecutor();
+ nodeForExec[execIndex] = node;
+ workerSlotForExec[execIndex] = workerSlot;
+ assigned = true;
+ LOG.debug("backtrackSearch: Assigned execId={} to
node={}, node/slot-ordinal={} at loopCnt={}",
+ execIndex, nodeId, progressIdx, loopCnt);
+ continue OUTERMOST_LOOP;
+ } catch (Exception ex) {
+ LOG.error("backtrackSearch: Failed to schedule execId
{} to node/slot-ordinal = {} at loopCnt = {}, nodeId = {}",
+ ex, execIndex, progressIdx, loopCnt, nodeId);
+ continue;
+ }
+ }
+ }
+ if (!assigned) {
Review comment:
assigned not required. will remove it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services