Christopher Johnson created STORM-3510:
------------------------------------------
Summary: WorkerState.transferLocalBatch backpressure resend logic
fix
Key: STORM-3510
URL: https://issues.apache.org/jira/browse/STORM-3510
Project: Apache Storm
Issue Type: Bug
Components: storm-client
Affects Versions: 2.2.0
Reporter: Christopher Johnson
WorkerState.transferLocalBatch uses an int lastOverflowCount to track the size
of the overflow queue, and periodically resend the backpressure status to
remote workers if the queue continues to grow.
The current implementation has two problems:
* The single variable tracks the receive queue of every executor in the
worker, meaning it will be overwritten as tuples are sent to different
executors.
* The variable is locally scoped, and so is not carried over between
mini-batches.
This only comes in to effect when the overflow queue grows beyond 10000, which
shouldn't happen unless a backpressure signal isn't received by an upstream
worker, but if it does happen then a backpressure signal is going to be sent
for every mini-batch processed. I do not know if this is the intended
behaviour, but the way the code is written seems to indicate that it isn't.
I have thought of two redesigns to fix these problems and make the behaviour
align with how one would interpret the code:
# *Change the lastOverflowCount variable to a map of taskId to overflow
count* - This will retain the behaviour of resending the backpressure update
every mini-batch once over the threshold, if that behaviour is intended.
However, it will increase garbage by creating a new map every time
WorkerState.transferLocalBatch is called by the NettyWorker thread.
# *Change the lastOverflowCount variable to a map of taskId to overflow count*
*and move it to the BackPressureTracker class* - This will retain the counter
between batches, and so only resend backpressure status every 10000 received
tuples per task.
My preference is for the second option, as if the intended behaviour is to
resend every mini batch it should be rewritten so the intent is explicit from
the code.
It is also possible that doing it the second way could run in to concurrency
issues i didn't think of, but as far as i can tell the
topology.worker.receiver.thread.count config option isn't used at all? If
that's the case and there is only one NettyWorker thread per worker then it
should be fine.
I have implemented both methods and attempted to benchmark them with
[https://github.com/yahoo/storm-perf-test] but as i am running all workers on
one machine i couldn't get it to the point that the relevant code was ever
called.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)