Ethanlm commented on a change in pull request #3136: [STORM-3519] Change 
ConstraintSolverStrategy::backtrackSearch to iteration
URL: https://github.com/apache/storm/pull/3136#discussion_r334296124
 
 

 ##########
 File path: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
 ##########
 @@ -320,51 +320,101 @@ private boolean checkSchedulingFeasibility(int 
maxStateSearch) {
         return 
GenericResourceAwareStrategy.sortObjectResourcesImpl(allResources, exec, 
topologyDetails, existingScheduleFunc);
     }
 
-    // Backtracking algorithm does not take into account the ordering of 
executors in worker to reduce traversal space
+    /**
+     * Try to schedule till successful or till limits (backtrack count or 
time) have been exceeded.
+     *
+     * @param state terminal state of the executor assignment.
+     * @return SolverResult with success attribute set to true or false 
indicting whether ALL executors were assigned.
+     */
     @VisibleForTesting
     protected SolverResult backtrackSearch(SearcherState state) {
-        state.incStatesSearched();
-        if (state.areSearchLimitsExceeded()) {
-            LOG.warn("Limits Exceeded");
-            return new SolverResult(state, false);
-        }
+        long         startTimeMilli     = System.currentTimeMillis();
+        int          maxExecCnt         = state.getExecSize();
+
+        // following three are state information at each "execIndex" level
+        int[]        progressIdxForExec = new int[maxExecCnt];
+        RasNode[]    nodeForExec        = new RasNode[maxExecCnt];
+        WorkerSlot[] workerSlotForExec  = new WorkerSlot[maxExecCnt];
 
-        if (Thread.currentThread().isInterrupted()) {
-            return new SolverResult(state, false);
+        for (int i = 0; i < maxExecCnt ; i++) {
+            progressIdxForExec[i] = -1;
         }
+        LOG.info("backtrackSearch: will assign {} executors", maxExecCnt);
+
+        OUTERMOST_LOOP:
+        for (int loopCnt = 0 ; true ; loopCnt++) {
+            LOG.debug("backtrackSearch: loopCnt = {}, state.execIndex = {}", 
loopCnt, state.execIndex);
+            if (state.areSearchLimitsExceeded()) {
+                LOG.warn("backtrackSearch: Search limits exceeded");
+                return new SolverResult(state, false);
+            }
 
-        ExecutorDetails exec = state.currentExec();
-        Iterable<String> sortedNodes = sortAllNodes(state.td, exec, 
favoredNodeIds, unFavoredNodeIds);
+            if (Thread.currentThread().isInterrupted()) {
+                return new SolverResult(state, false);
+            }
 
-        for (String nodeId: sortedNodes) {
-            RasNode node = nodes.get(nodeId);
-            for (WorkerSlot workerSlot : node.getSlotsAvailableToScheduleOn()) 
{
-                if (isExecAssignmentToWorkerValid(workerSlot, state)) {
-                    state.tryToSchedule(execToComp, node, workerSlot);
+            int execIndex = state.execIndex;
 
-                    if (state.areAllExecsScheduled()) {
-                        //Everything is scheduled correctly, so no need to 
search any more.
-                        return new SolverResult(state, true);
-                    }
+            ExecutorDetails exec = state.currentExec();
+            Iterable<String> sortedNodesIter = sortAllNodes(state.td, exec, 
favoredNodeIds, unFavoredNodeIds);
 
-                    SolverResult results = 
backtrackSearch(state.nextExecutor());
-                    if (results.success) {
-                        //We found a good result we are done.
-                        return results;
+            int progressIdx = -1;
+            boolean assigned = false;
+            for (String nodeId : sortedNodesIter) {
+                RasNode node = nodes.get(nodeId);
+                for (WorkerSlot workerSlot : 
node.getSlotsAvailableToScheduleOn()) {
+                    progressIdx++;
+                    if (progressIdx <= progressIdxForExec[execIndex]) {
+                        continue;
                     }
+                    progressIdxForExec[execIndex]++;
+                    LOG.debug("backtrackSearch: loopCnt = {}, state.execIndex 
= {}, node/slot-ordinal = {}, nodeId = {}",
+                        loopCnt, execIndex, progressIdx, nodeId);
 
-                    if (state.areSearchLimitsExceeded()) {
-                        //No need to search more it is not going to help.
-                        return new SolverResult(state, false);
+                    if (!isExecAssignmentToWorkerValid(workerSlot, state)) {
+                        continue;
                     }
 
-                    //backtracking (If we ever get here there really isn't a 
lot of hope that we will find a scheduling)
-                    state.backtrack(execToComp, node, workerSlot);
+                    try {
+                        state.incStatesSearched();
+                        state.tryToSchedule(execToComp, node, workerSlot);
+                        if (state.areAllExecsScheduled()) {
+                            //Everything is scheduled correctly, so no need to 
search any more.
+                            LOG.info("backtrackSearch: AllExecsScheduled at 
loopCnt = {} in {} milliseconds, elapsedtime in state={}",
+                                loopCnt, System.currentTimeMillis() - 
startTimeMilli, Time.currentTimeMillis() - state.startTimeMillis);
+                            return new SolverResult(state, true);
+                        }
+                        state = state.nextExecutor();
+                        nodeForExec[execIndex] = node;
+                        workerSlotForExec[execIndex] = workerSlot;
+                        assigned = true;
+                        LOG.debug("backtrackSearch: Assigned execId={} to 
node={}, node/slot-ordinal={} at loopCnt={}", 
+                            execIndex, nodeId, progressIdx, loopCnt);
+                        continue OUTERMOST_LOOP;
+                    } catch (Exception ex) {
+                        LOG.error("backtrackSearch: Failed to schedule execId 
{} to node/slot-ordinal = {} at loopCnt = {}, nodeId = {}",
+                            ex, execIndex, progressIdx, loopCnt, nodeId);
+                        continue;
+                    }
+                }
+            }
+            if (!assigned) {
+                // if here, then the executor was not assigned, backtrack;
+                LOG.debug("backtrackSearch: Failed to schedule execId = {} at 
loopCnt = {}", execIndex, loopCnt);
+                if (execIndex == 0) {
+                    break;
+                } else {
+                    state.backtrack(execToComp, nodeForExec[execIndex - 1], 
workerSlotForExec[execIndex - 1]);
+                    for (int i = execIndex ; i < maxExecCnt ; i++) {
+                        progressIdxForExec[i] = -1;
+                    }
                 }
             }
         }
-        //Tried all of the slots and none of them worked.
-        return new SolverResult(state, false);
+        boolean success = state.areAllExecsScheduled();
+        LOG.info("backtrackSearch: Scheduled={} in {} milliseconds, 
elapsedtime in state={}",
+            success, System.currentTimeMillis() - startTimeMilli, 
Time.currentTimeMillis() - state.startTimeMillis);
 
 Review comment:
   Why are `System.currentTimeMillis()` and `Time.currentTimeMillis()` both 
used here? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to