Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158196161 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java --- @@ -439,29 +436,40 @@ public void refreshStormActive(Runnable callback) { } } - public void refreshThrottle() { - boolean backpressure = stormClusterState.topologyBackpressure(topologyId, this::refreshThrottle); - this.throttleOn.set(backpressure); - } - public void refreshLoad() { - Set<Integer> remoteTasks = Sets.difference(new HashSet<Integer>(outboundTasks), new HashSet<>(taskIds)); + Set<Integer> remoteTasks = Sets.difference(new HashSet<Integer>(outboundTasks), new HashSet<>(localTaskIds)); Long now = System.currentTimeMillis(); Map<Integer, Double> localLoad = shortExecutorReceiveQueueMap.entrySet().stream().collect(Collectors.toMap( - (Function<Map.Entry<Integer, DisruptorQueue>, Integer>) Map.Entry::getKey, - (Function<Map.Entry<Integer, DisruptorQueue>, Double>) entry -> { - DisruptorQueue.QueueMetrics qMetrics = entry.getValue().getMetrics(); - return ( (double) qMetrics.population()) / qMetrics.capacity(); + (Function<Map.Entry<Integer, JCQueue>, Integer>) Map.Entry::getKey, + (Function<Map.Entry<Integer, JCQueue>, Double>) entry -> { + JCQueue.QueueMetrics qMetrics = entry.getValue().getMetrics(); + return ((double) qMetrics.population()) / qMetrics.capacity(); })); Map<Integer, Load> remoteLoad = new HashMap<>(); cachedNodeToPortSocket.get().values().stream().forEach(conn -> remoteLoad.putAll(conn.getLoad(remoteTasks))); loadMapping.setLocal(localLoad); loadMapping.setRemote(remoteLoad); - if (now > nextUpdate.get()) { + if (now > nextLoadUpdate.get()) { receiver.sendLoadMetrics(localLoad); - nextUpdate.set(now + LOAD_REFRESH_INTERVAL_MS); + nextLoadUpdate.set(now + LOAD_REFRESH_INTERVAL_MS); + } + } + + // checks if the tasks which had back pressure are now free again. if so, sends an update to other workers + public void refreshBackPressureStatus() { + LOG.debug("Checking for change in Backpressure status on worker's tasks"); + boolean bpSituationChanged = bpTracker.refreshBpTaskList(); + if (bpSituationChanged) { + BackPressureStatus bpStatus = bpTracker.getCurrStatus(); + receiver.sendBackPressureStatus(bpStatus); + } + for (Entry<Integer, JCQueue> entry : localReceiveQueues.entrySet()) { --- End diff -- Looks like this effectively does nothing. Is there any missing here? Or can this statement (for) be removed? If you have reason to do so, please leave a comment describing the reason.
---