Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2241#discussion_r158196161
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---
    @@ -439,29 +436,40 @@ public void refreshStormActive(Runnable callback) {
             }
         }
     
    -    public void refreshThrottle() {
    -        boolean backpressure = 
stormClusterState.topologyBackpressure(topologyId, this::refreshThrottle);
    -        this.throttleOn.set(backpressure);
    -    }
    -
         public void refreshLoad() {
    -        Set<Integer> remoteTasks = Sets.difference(new 
HashSet<Integer>(outboundTasks), new HashSet<>(taskIds));
    +        Set<Integer> remoteTasks = Sets.difference(new 
HashSet<Integer>(outboundTasks), new HashSet<>(localTaskIds));
             Long now = System.currentTimeMillis();
             Map<Integer, Double> localLoad = 
shortExecutorReceiveQueueMap.entrySet().stream().collect(Collectors.toMap(
    -            (Function<Map.Entry<Integer, DisruptorQueue>, Integer>) 
Map.Entry::getKey,
    -            (Function<Map.Entry<Integer, DisruptorQueue>, Double>) entry 
-> {
    -                DisruptorQueue.QueueMetrics qMetrics = 
entry.getValue().getMetrics();
    -                return ( (double) qMetrics.population()) / 
qMetrics.capacity();
    +            (Function<Map.Entry<Integer, JCQueue>, Integer>) 
Map.Entry::getKey,
    +            (Function<Map.Entry<Integer, JCQueue>, Double>) entry -> {
    +                JCQueue.QueueMetrics qMetrics = 
entry.getValue().getMetrics();
    +                return ((double) qMetrics.population()) / 
qMetrics.capacity();
                 }));
     
             Map<Integer, Load> remoteLoad = new HashMap<>();
             cachedNodeToPortSocket.get().values().stream().forEach(conn -> 
remoteLoad.putAll(conn.getLoad(remoteTasks)));
             loadMapping.setLocal(localLoad);
             loadMapping.setRemote(remoteLoad);
     
    -        if (now > nextUpdate.get()) {
    +        if (now > nextLoadUpdate.get()) {
                 receiver.sendLoadMetrics(localLoad);
    -            nextUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
    +            nextLoadUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
    +        }
    +    }
    +
    +    // checks if the tasks which had back pressure are now free again. if 
so, sends an update to other workers
    +    public void refreshBackPressureStatus() {
    +        LOG.debug("Checking for change in Backpressure status on worker's 
tasks");
    +        boolean bpSituationChanged = bpTracker.refreshBpTaskList();
    +        if (bpSituationChanged) {
    +            BackPressureStatus bpStatus = bpTracker.getCurrStatus();
    +            receiver.sendBackPressureStatus(bpStatus);
    +        }
    +        for (Entry<Integer, JCQueue> entry : 
localReceiveQueues.entrySet()) {
    --- End diff --
    
    Looks like this effectively does nothing. Is there any missing here? Or can 
this statement (for) be removed? If you have reason to do so, please leave a 
comment describing the reason.


---

Reply via email to