Ethanlm commented on a change in pull request #3346:
URL: https://github.com/apache/storm/pull/3346#discussion_r530539415



##########
File path: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
##########
@@ -428,16 +465,39 @@ protected SchedulingResult 
scheduleExecutorsOnNodes(List<ExecutorDetails> ordere
                     }
                     progressIdxForExec[execIndex]++;
 
+                    int numBoundAckerAssigned
+                        = assignBoundAckersForNewWorkerSlot(exec, node, 
workerSlot);
+                    if (numBoundAckerAssigned == -1) {
+                        // This only happens when trying to assign bound 
ackers to the worker slot and failed.
+                        // Free the entire worker slot and put those bound 
ackers back to unassigned list
+                        LOG.debug("Failed to assign bound acker for exec: {} 
of topo: {} to worker: {}.  Backtracking.",
+                            exec, topoName, workerSlot);
+                        searcherState.freeWorkerSlotWithBoundAckers(node, 
workerSlot);
+                        continue;
+                    }
+
                     if (!isExecAssignmentToWorkerValid(exec, workerSlot)) {
+                        // This only happens when this exec can not fit in the 
workerSlot
+                        // and this is not the first exec to this workerSlot.
+                        // So just go to next workerSlot and don't free the 
worker.
+                        if (numBoundAckerAssigned > 0) {
+                            LOG.debug("Failed to assign exec: {} of topo: {} 
with bound ackers to worker: {}.  Backtracking.",
+                                exec, topoName, workerSlot);
+                            searcherState.freeWorkerSlotWithBoundAckers(node, 
workerSlot);
+                        }
                         continue;
                     }
 
                     searcherState.incStatesSearched();
                     searcherState.assignCurrentExecutor(execToComp, node, 
workerSlot);
+                    if (numBoundAckerAssigned > 0) {
+                        // This exec with its bounded ackers have all been 
successfully assigned
+                        searcherState.getExecsWithBoundAckers().add(exec);
+                    }
                     if (searcherState.areAllExecsScheduled()) {
                         //Everything is scheduled correctly, so no need to 
search any more.
                         LOG.info("scheduleExecutorsOnNodes: Done at loopCnt={} 
in {}ms, state.elapsedtime={}, backtrackCnt={}, topo={}",
-                                loopCnt, System.currentTimeMillis() - 
startTimeMilli,
+                                loopCnt, Time.currentTimeMillis() - 
startTimeMilli,

Review comment:
       Should we change them to be `System` if we don't want to use `Time` 
since it is simulated in unit test? 
   I guess my question is really why only changes one not the other? 
   
   I think we should just use one of them, either `Time` or `System`, in both 
places




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to