Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2764#discussion_r208718858
--- Diff:
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int
numUsedWorkers(SchedulerAssignment assignment) {
return ret;
}
- private static Map<String, Map<List<Long>, List<Object>>>
computeNewTopoToExecToNodePort(
- Map<String, SchedulerAssignment> schedAssignments, Map<String,
Assignment> existingAssignments) {
- Map<String, Map<List<Long>, List<Object>>> ret =
computeTopoToExecToNodePort(schedAssignments);
- // Print some useful information
- if (existingAssignments != null && !existingAssignments.isEmpty())
{
- for (Entry<String, Map<List<Long>, List<Object>>> entry :
ret.entrySet()) {
- String topoId = entry.getKey();
- Map<List<Long>, List<Object>> execToNodePort =
entry.getValue();
- Assignment assignment = existingAssignments.get(topoId);
- if (assignment == null) {
- continue;
+ private boolean inspectSchduling(Map<String, Assignment>
existingAssignments,
+ Map<String, Assignment>
newAssignments) {
+ assert existingAssignments != null && newAssignments != null;
+ boolean anyChanged = existingAssignments.isEmpty() ^
newAssignments.isEmpty();
+ long numRemovedExec = 0;
+ long numRemovedSlot = 0;
+ long numAddedExec = 0;
+ long numAddedSlot = 0;
+ if (existingAssignments.isEmpty()) {
+ for (Entry<String, Assignment> entry :
newAssignments.entrySet()) {
+ final Map<List<Long>, NodeInfo> execToPort =
entry.getValue().get_executor_node_port();
+ final long count = new
HashSet<>(execToPort.values()).size();
+ LOG.info("Assigning {} to {} slots", entry.getKey(),
count);
+ LOG.info("Assign executors: {}", execToPort.keySet());
+ numAddedSlot += count;
+ numAddedExec += execToPort.size();
+ }
+ } else if (newAssignments.isEmpty()) {
+ for (Entry<String, Assignment> entry :
existingAssignments.entrySet()) {
+ final Map<List<Long>, NodeInfo> execToPort =
entry.getValue().get_executor_node_port();
+ final long count = new
HashSet<>(execToPort.values()).size();
+ LOG.info("Removing {} from {} slots", entry.getKey(),
count);
+ LOG.info("Remove executors: {}", execToPort.keySet());
+ numRemovedSlot += count;
+ numRemovedExec += execToPort.size();
+ }
+ } else {
+ MapDifference<String, Assignment> difference =
Maps.difference(existingAssignments, newAssignments);
+ if (anyChanged = (difference.entriesInCommon().size() !=
newAssignments.size())) {
+ for (Entry<String, Assignment> entry :
difference.entriesOnlyOnLeft().entrySet()) {
+ final Map<List<Long>, NodeInfo> execToPort =
entry.getValue().get_executor_node_port();
+ final long count = new
HashSet<>(execToPort.values()).size();
+ LOG.info("Removing {} from {} slots", entry.getKey(),
count);
+ LOG.info("Remove executors: {}", execToPort.keySet());
+ numRemovedSlot += count;
+ numRemovedExec += execToPort.size();
}
- Map<List<Long>, NodeInfo> old =
assignment.get_executor_node_port();
- Map<List<Long>, List<Object>> reassigned = new HashMap<>();
- for (Entry<List<Long>, List<Object>> execAndNodePort :
execToNodePort.entrySet()) {
- NodeInfo oldAssigned =
old.get(execAndNodePort.getKey());
- String node = (String)
execAndNodePort.getValue().get(0);
- Long port = (Long) execAndNodePort.getValue().get(1);
- if (oldAssigned == null ||
!oldAssigned.get_node().equals(node)
- ||
!port.equals(oldAssigned.get_port_iterator().next())) {
- reassigned.put(execAndNodePort.getKey(),
execAndNodePort.getValue());
- }
+ for (Entry<String, Assignment> entry :
difference.entriesOnlyOnRight().entrySet()) {
+ final Map<List<Long>, NodeInfo> execToPort =
entry.getValue().get_executor_node_port();
+ final long count = new
HashSet<>(execToPort.values()).size();
+ LOG.info("Assigning {} to {} slots", entry.getKey(),
count);
+ LOG.info("Assign executors: {}", execToPort.keySet());
+ numAddedSlot += count;
+ numAddedExec += execToPort.size();
}
- if (!reassigned.isEmpty()) {
- int count = (new
HashSet<>(execToNodePort.values())).size();
- Set<List<Long>> reExecs = reassigned.keySet();
- LOG.info("Reassigning {} to {} slots", topoId, count);
- LOG.info("Reassign executors: {}", reExecs);
+ for (Entry<String,
MapDifference.ValueDifference<Assignment>> entry :
difference.entriesDiffering().entrySet()) {
+ final Map<List<Long>, NodeInfo> execToSlot =
entry.getValue().rightValue().get_executor_node_port();
+ final Set<NodeInfo> slots = new
HashSet<>(execToSlot.values());
+ LOG.info("Reassigning {} to {} slots", entry.getKey(),
slots.size());
+ LOG.info("Reassign executors: {}",
execToSlot.keySet());
+
+ //We can simplify this part if scheduler does't
partially remove or add executor
+ final Map<List<Long>, NodeInfo> oldExecToSlot =
entry.getValue().leftValue().get_executor_node_port();
+
+ final Map<List<Long>, NodeInfo> smaller =
execToSlot.size() > oldExecToSlot.size() ? oldExecToSlot : execToSlot;
--- End diff --
I think you can get the same result by getting the common keys for
execToSlot and oldExecToSlot, and iterating that set and comparing the values.
```
Set<List<Long>> common = new HashSet<>(execToSlot.keySet());
common.retainAll(oldExecToSlot.keySet());
for (List<Long> exec : common) {
if (execToSlot.get(exec).equals(oldExecToSlot.get(exec))) {
...
}
}
```
Disregard if it's written this way for performance, but I think using the
set intersection would be easier to read.
---