Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2764#discussion_r208976776
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
             return ret;
         }
     
    -    private static Map<String, Map<List<Long>, List<Object>>> 
computeNewTopoToExecToNodePort(
    -        Map<String, SchedulerAssignment> schedAssignments, Map<String, 
Assignment> existingAssignments) {
    -        Map<String, Map<List<Long>, List<Object>>> ret = 
computeTopoToExecToNodePort(schedAssignments);
    -        // Print some useful information
    -        if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
    -            for (Entry<String, Map<List<Long>, List<Object>>> entry : 
ret.entrySet()) {
    -                String topoId = entry.getKey();
    -                Map<List<Long>, List<Object>> execToNodePort = 
entry.getValue();
    -                Assignment assignment = existingAssignments.get(topoId);
    -                if (assignment == null) {
    -                    continue;
    +    private boolean inspectSchduling(Map<String, Assignment> 
existingAssignments,
    +                                            Map<String, Assignment> 
newAssignments) {
    +        assert existingAssignments != null && newAssignments != null;
    +        boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
    +        long numRemovedExec = 0;
    +        long numRemovedSlot = 0;
    +        long numAddedExec   = 0;
    +        long numAddedSlot   = 0;
    +        if (existingAssignments.isEmpty()) {
    +            for (Entry<String, Assignment> entry : 
newAssignments.entrySet()) {
    +                final Map<List<Long>, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
    +                final long count = new 
HashSet<>(execToPort.values()).size();
    +                LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
    +                LOG.info("Assign executors: {}", execToPort.keySet());
    +                numAddedSlot += count;
    +                numAddedExec += execToPort.size();
    +            }
    +        } else if (newAssignments.isEmpty()) {
    +            for (Entry<String, Assignment> entry : 
existingAssignments.entrySet()) {
    +                final Map<List<Long>, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
    +                final long count = new 
HashSet<>(execToPort.values()).size();
    +                LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
    +                LOG.info("Remove executors: {}", execToPort.keySet());
    +                numRemovedSlot += count;
    +                numRemovedExec += execToPort.size();
    +            }
    +        } else {
    +            MapDifference<String, Assignment> difference = 
Maps.difference(existingAssignments, newAssignments);
    +            if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
    --- End diff --
    
    I think we're talking past each other a little.
    
    What I'm saying is that this code block looks to me like
    ```
    if (A == B) {
      for (a only in A) {}
      for (b only in B) {}
      for (c in A or B where the value is different) {}
    }
    ```
    
    What I'm saying is that the `if` guard is not necessary, because if the 
`if` returns false, we'd be skipping all 3 for loops anyway, because the 
iterated sets are empty. If that's the case, we can still update `anyChanged`, 
but it shouldn't be happening in an `if`.


---

Reply via email to