This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 4de0622 [STORM-3755] While scheduling multiple ackers with executor
use best effort basis. (#3386)
4de0622 is described below
commit 4de0622c6e25e891419f8ccd002e243470dfe969
Author: Bipin Prasad <[email protected]>
AuthorDate: Thu Mar 18 11:06:34 2021 -0500
[STORM-3755] While scheduling multiple ackers with executor use best effort
basis. (#3386)
---
.../scheduling/BaseResourceAwareStrategy.java | 61 +++++++++-------------
1 file changed, 24 insertions(+), 37 deletions(-)
diff --git
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 4bc4d0b..bdd89ff 100644
---
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -485,39 +485,22 @@ public abstract class BaseResourceAwareStrategy
implements IStrategy {
}
progressIdxForExec[execIndex]++;
- int numBoundAckerAssigned
- = assignBoundAckersForNewWorkerSlot(exec, node,
workerSlot);
- if (numBoundAckerAssigned == -1) {
- // This only happens when trying to assign bound
ackers to the worker slot and failed.
- // Free the entire worker slot and put those bound
ackers back to unassigned list
- LOG.debug("Failed to assign bound acker for exec={},
comp={}, topo: {} to worker: {}. Backtracking.",
- exec, comp, topoName, workerSlot);
- searcherState.freeWorkerSlotWithBoundAckers(node,
workerSlot);
- continue;
- }
-
if (!isExecAssignmentToWorkerValid(exec, workerSlot)) {
- // This only happens when this exec can not fit in the
workerSlot
- // and this is not the first exec to this workerSlot.
- // So just go to next workerSlot and don't free the
worker.
- if (numBoundAckerAssigned > 0) {
- LOG.debug("Failed to assign exec={}, comp={},
topo={} with bound ackers to worker: {}. Backtracking.",
- exec, comp, topoName, workerSlot);
- searcherState.freeWorkerSlotWithBoundAckers(node,
workerSlot);
- } else {
- LOG.debug("Failed to assign exec={}, comp={},
topo={} to worker={} on node=({}, availCpu={}, availMem={}).",
- exec, comp, topoName, workerSlot,
- node.getId(), node.getAvailableCpuResources(),
node.getAvailableMemoryResources());
- }
+ // exec can't fit in this workerSlot, try next
workerSlot
+ LOG.debug("Failed to assign exec={}, comp={}, topo={}
to worker={} on node=({}, availCpu={}, availMem={}).",
+ exec, comp, topoName, workerSlot,
+ node.getId(), node.getAvailableCpuResources(),
node.getAvailableMemoryResources());
continue;
}
searcherState.incStatesSearched();
searcherState.assignCurrentExecutor(execToComp, node,
workerSlot);
+ int numBoundAckerAssigned =
assignBoundAckersForNewWorkerSlot(exec, node, workerSlot);
if (numBoundAckerAssigned > 0) {
- // This exec with its bounded ackers have all been
successfully assigned
+ // This exec with some of its bounded ackers have all
been successfully assigned
searcherState.getExecsWithBoundAckers().add(exec);
}
+
if (searcherState.areAllExecsScheduled()) {
//Everything is scheduled correctly, so no need to
search any more.
LOG.info("scheduleExecutorsOnNodes: Done at loopCnt={}
in {}ms, state.elapsedtime={}, backtrackCnt={}, topo={}",
@@ -559,36 +542,40 @@ public abstract class BaseResourceAwareStrategy
implements IStrategy {
/**
* <p>
* Determine how many bound ackers to put into the given workerSlot.
- * Then try to assign the ackers one by one into this workerSlot.
+ * Then try to assign the ackers one by one into this workerSlot upto the
calculated
+ * maximum required. Return the number of ackers assigned.
*
- * Return -1 only if the bound ackers assignment process failed.
* Return 0 if one of the conditions hold true:
* 1. No bound ackers are used.
* 2. This is not first exec assigned to this worker.
- * Return positive int if all bound ackers assignments succeed.
+ * 3. No ackers could be assigned because of space or exception.
+ *
* </p>
* @param exec being scheduled.
* @param node RasNode on which to schedule.
* @param workerSlot WorkerSlot on which to schedule.
- * @return If we successfully assigned bound worker for
this exec
+ * @return Number of ackers assigned.
*/
private int assignBoundAckersForNewWorkerSlot(ExecutorDetails exec,
RasNode node, WorkerSlot workerSlot) {
int numOfAckersToBind = searcherState.getNumOfAckersToBind(exec,
workerSlot);
if (numOfAckersToBind > 0) {
for (int i = 0; i < numOfAckersToBind; i++) {
if
(!isExecAssignmentToWorkerValid(searcherState.peekUnassignedAckers(),
workerSlot)) {
- return -1;
- } else {
- try {
- searcherState.assignSingleBoundAcker(node, workerSlot);
- } catch (Exception e) {
- LOG.error("Exception happens when assigning {}th acker
executor to workerSlot: {} for topology: {}",
- i, workerSlot, topoName, e);
- return -1;
- }
+ LOG.debug("Assigned {} of {} ackers on workerSlot={} with
the executor={} for topology={}",
+ i, numOfAckersToBind, workerSlot, exec, topoName);
+ return i;
+ }
+ try {
+ searcherState.assignSingleBoundAcker(node, workerSlot);
+ } catch (Exception e) {
+ LOG.error("Exception happens when assigning {} of {}
ackers on workerSlot={} for topology={}",
+ i + 1, numOfAckersToBind, workerSlot,
topoName, e);
+ return i;
}
}
}
+ LOG.debug("Assigned {} ackers on workerSlot={} with the executor={}
for topology={}",
+ numOfAckersToBind, workerSlot, exec, topoName);
return numOfAckersToBind;
}
}