Github user govind-menon commented on a diff in the pull request:
https://github.com/apache/storm/pull/2338#discussion_r140337615
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
---
@@ -406,25 +406,36 @@ private WorkerBackpressureCallback
mkBackpressureHandler() {
final List<IRunningExecutor> executors = executorsAtom.get();
return new WorkerBackpressureCallback() {
@Override public void onEvent(Object obj) {
- String topologyId = workerState.topologyId;
- String assignmentId = workerState.assignmentId;
- int port = workerState.port;
- IStormClusterState stormClusterState =
workerState.stormClusterState;
- boolean prevBackpressureFlag =
workerState.backpressure.get();
- boolean currBackpressureFlag = prevBackpressureFlag;
if (null != executors) {
- currBackpressureFlag =
workerState.transferQueue.getThrottleOn() || (executors.stream()
-
.map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 ||
op2)).get());
- }
+ String topologyId = workerState.topologyId;
+ String assignmentId = workerState.assignmentId;
+ int port = workerState.port;
+ IStormClusterState stormClusterState =
workerState.stormClusterState;
+ long prevBackpressureTimestamp =
workerState.backpressure.get();
+ long currTimestamp = System.currentTimeMillis();
+ long currBackpressureTimestamp = 0;
+ // the backpressure flag is true if at least one of
the disruptor queues has throttle-on
+ boolean backpressureFlag =
workerState.transferQueue.getThrottleOn() || (executors.stream()
+
.map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 ||
op2)).get());
+
+ if (backpressureFlag) {
+ // update the backpressure timestamp every 15
seconds
+ if ((currTimestamp - prevBackpressureTimestamp) >
15000) {
--- End diff --
Could we make this configurable? If not a constant somewhere
---