Github user govind-menon commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2338#discussion_r140337615
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java 
---
    @@ -406,25 +406,36 @@ private WorkerBackpressureCallback 
mkBackpressureHandler() {
             final List<IRunningExecutor> executors = executorsAtom.get();
             return new WorkerBackpressureCallback() {
                 @Override public void onEvent(Object obj) {
    -                String topologyId = workerState.topologyId;
    -                String assignmentId = workerState.assignmentId;
    -                int port = workerState.port;
    -                IStormClusterState stormClusterState = 
workerState.stormClusterState;
    -                boolean prevBackpressureFlag = 
workerState.backpressure.get();
    -                boolean currBackpressureFlag = prevBackpressureFlag;
                     if (null != executors) {
    -                    currBackpressureFlag = 
workerState.transferQueue.getThrottleOn() || (executors.stream()
    -                        
.map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || 
op2)).get());
    -                }
    +                    String topologyId = workerState.topologyId;
    +                    String assignmentId = workerState.assignmentId;
    +                    int port = workerState.port;
    +                    IStormClusterState stormClusterState = 
workerState.stormClusterState;
    +                    long prevBackpressureTimestamp = 
workerState.backpressure.get();
    +                    long currTimestamp = System.currentTimeMillis();
    +                    long currBackpressureTimestamp = 0;
    +                    // the backpressure flag is true if at least one of 
the disruptor queues has throttle-on
    +                    boolean backpressureFlag = 
workerState.transferQueue.getThrottleOn() || (executors.stream()
    +                            
.map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || 
op2)).get());
    +
    +                    if (backpressureFlag) {
    +                        // update the backpressure timestamp every 15 
seconds
    +                        if ((currTimestamp - prevBackpressureTimestamp) > 
15000) {
    --- End diff --
    
    Could we make this configurable? If not a constant somewhere


---

Reply via email to