Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2623#discussion_r179571966
--- Diff:
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
---
@@ -477,45 +414,136 @@ protected String nodeToRack(RAS_Node node) {
List<ExecutorDetails> execsScheduled = new LinkedList<>();
Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new
HashMap<>();
- for (Component component : componentMap.values()) {
- compToExecsToSchedule.put(component.getId(), new
LinkedList<ExecutorDetails>());
+ for (Map.Entry<String, Component> componentEntry:
componentMap.entrySet()) {
+ Component component = componentEntry.getValue();
+ compToExecsToSchedule.put(component.getId(), new
LinkedList<>());
for (ExecutorDetails exec : component.getExecs()) {
if (unassignedExecutors.contains(exec)) {
compToExecsToSchedule.get(component.getId()).add(exec);
+ LOG.info("{} has unscheduled executor {}",
component.getId(), exec);
}
}
}
- Set<Component> sortedComponents = sortComponents(componentMap);
- sortedComponents.addAll(componentMap.values());
+ List<Component> sortedComponents =
topologicalSortComponents(componentMap);
- for (Component currComp : sortedComponents) {
- Map<String, Component> neighbors = new HashMap<String,
Component>();
- for (String compId : Sets.union(currComp.getChildren(),
currComp.getParents())) {
- neighbors.put(compId, componentMap.get(compId));
+ for (Component currComp: sortedComponents) {
+ int numExecs =
compToExecsToSchedule.get(currComp.getId()).size();
+ for (int i = 0; i < numExecs; i++) {
+ execsScheduled.addAll(takeExecutors(currComp, numExecs -
i, componentMap, compToExecsToSchedule));
}
- Set<Component> sortedNeighbors = sortNeighbors(currComp,
neighbors);
- Queue<ExecutorDetails> currCompExesToSched =
compToExecsToSchedule.get(currComp.getId());
-
- boolean flag = false;
- do {
- flag = false;
- if (!currCompExesToSched.isEmpty()) {
- execsScheduled.add(currCompExesToSched.poll());
- flag = true;
- }
+ }
+
+ LOG.info("The ordering result is {}", execsScheduled);
+
+ return execsScheduled;
+ }
- for (Component neighborComp : sortedNeighbors) {
- Queue<ExecutorDetails> neighborCompExesToSched =
- compToExecsToSchedule.get(neighborComp.getId());
- if (!neighborCompExesToSched.isEmpty()) {
- execsScheduled.add(neighborCompExesToSched.poll());
- flag = true;
+ private List<ExecutorDetails> takeExecutors(Component currComp, int
numExecs,
+ final Map<String,
Component> componentMap,
+ final Map<String,
Queue<ExecutorDetails>> compToExecsToSchedule) {
+ List<ExecutorDetails> execsScheduled = new ArrayList<>();
+ Queue<ExecutorDetails> currQueue =
compToExecsToSchedule.get((currComp.getId()));
+ Set<String> sortedChildren = getSortedChildren(currComp,
componentMap);
+
+ execsScheduled.add(currQueue.poll());
+
+ for (String childId: sortedChildren) {
+ Component childComponent = componentMap.get(childId);
+ Queue<ExecutorDetails> childQueue =
compToExecsToSchedule.get(childId);
+ int childNumExecs = childQueue.size();
+ if (childNumExecs == 0) {
+ continue;
+ }
+ int numExecsToTake = 1;
+ if (isShuffleFromParentToChild(currComp, childComponent)) {
+ // if it's shuffle grouping, truncate
+ numExecsToTake = Math.max(1, childNumExecs / numExecs);
+ } // otherwise, one-by-one
+
+ for (int i = 0; i < numExecsToTake; i++) {
+ execsScheduled.addAll(takeExecutors(childComponent,
childNumExecs, componentMap, compToExecsToSchedule));
+ }
+ }
+
+ return execsScheduled;
+ }
+
+ private Set<String> getSortedChildren(Component component, final
Map<String, Component> componentMap) {
+ Set<String> children = component.getChildren();
+ Set<String> sortedChildren =
+ new TreeSet<String>((o1, o2) -> {
+ Component child1 = componentMap.get(o1);
+ Component child2 = componentMap.get(o2);
+ boolean child1IsShuffle =
isShuffleFromParentToChild(component, child1);
+ boolean child2IsShuffle =
isShuffleFromParentToChild(component, child2);
+
+ if (child1IsShuffle && child2IsShuffle) {
+ return o1.compareTo(o2);
+ } else if (child1IsShuffle) {
+ return 1;
+ } else {
+ return -1;
+ }
+ });
+ sortedChildren.addAll(children);
+ return sortedChildren;
+ }
+
+ private boolean isShuffleFromParentToChild(Component parent, Component
child) {
--- End diff --
Nit: could we rename this from `isShuffleFromParentToChild` to something
more like `hasLocalityAwareGroupingFromParentToChild`? I know it is longer,
but in the future we may want to offer a way to expand this to more than just
shuffle.
---