Christopher Johnson created STORM-3510:
------------------------------------------

             Summary: WorkerState.transferLocalBatch backpressure resend logic 
fix
                 Key: STORM-3510
                 URL: https://issues.apache.org/jira/browse/STORM-3510
             Project: Apache Storm
          Issue Type: Bug
          Components: storm-client
    Affects Versions: 2.2.0
            Reporter: Christopher Johnson


WorkerState.transferLocalBatch uses an int lastOverflowCount to track the size 
of the overflow queue, and periodically resend the backpressure status to 
remote workers if the queue continues to grow.

 

The current implementation has two problems:
 * The single variable tracks the receive queue of every executor in the 
worker, meaning it will be overwritten as tuples are sent to different 
executors.
 * The variable is locally scoped, and so is not carried over between 
mini-batches.

 

This only comes in to effect when the overflow queue grows beyond 10000, which 
shouldn't happen unless a backpressure signal isn't received by an upstream 
worker, but if it does happen then a backpressure signal is going to be sent 
for every mini-batch processed.  I do not know if this is the intended 
behaviour, but the way the code is written seems to indicate that it isn't.

 

I have thought of two redesigns to fix these problems and make the behaviour 
align with how one would interpret the code:

 
 #  *Change the lastOverflowCount variable to a map of taskId to overflow 
count* - This will retain the behaviour of resending the backpressure update 
every mini-batch once over the threshold, if that behaviour is intended.  
However, it will increase garbage by creating a new map every time 
WorkerState.transferLocalBatch is called by the NettyWorker thread.
 # *Change the lastOverflowCount variable to a map of taskId to overflow count* 
*and move it to the BackPressureTracker class* - This will retain the counter 
between batches, and so only resend backpressure status every 10000 received 
tuples per task.

 

My preference is for the second option, as if the intended behaviour is to 
resend every mini batch it should be rewritten so the intent is explicit from 
the code.

 

It is also possible that doing it the second way could run in to concurrency 
issues i didn't think of, but as far as i can tell the 
topology.worker.receiver.thread.count config option isn't used at all?  If 
that's the case and there is only one NettyWorker thread per worker then it 
should be fine.

 

I have implemented both methods and attempted to benchmark them with 
[https://github.com/yahoo/storm-perf-test] but as i am running all workers on 
one machine i couldn't get it to the point that the relevant code was ever 
called.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to