Fix counting bug in buffer Quoting @davisp:
There's a bug in rexi_buffer that can lead to the counter in its state running negative. I noticed this on malort looking for memory usage. A quick reading of the code suggests its due to us getting a timeout message with an empty queue. Theoretically this could happen if we've exceeded the MAX_MEMORY threshold when sending a message or even with just grabbing the buffered count when its idle. BugzID: 28049 Project: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/commit/45510149 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/tree/45510149 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/diff/45510149 Branch: refs/heads/windsor-merge Commit: 455101492947939cb7767c6bd888baf2e33ad497 Parents: cd07cb8 Author: Adam Kocoloski <[email protected]> Authored: Tue Jun 3 14:28:37 2014 -0400 Committer: Robert Newson <[email protected]> Committed: Wed Jul 23 18:08:14 2014 +0100 ---------------------------------------------------------------------- src/rexi_buffer.erl | 46 +++++++++++++++++----------------------------- 1 file changed, 17 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/45510149/src/rexi_buffer.erl ---------------------------------------------------------------------- diff --git a/src/rexi_buffer.erl b/src/rexi_buffer.erl index f75399c..26f3c97 100644 --- a/src/rexi_buffer.erl +++ b/src/rexi_buffer.erl @@ -58,37 +58,25 @@ handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) -> {noreply, State#state{buffer = Q2, count = C+1}, 0} end. -handle_info(timeout, #state{sender = nil} = State) -> +handle_info(timeout, #state{sender = nil, buffer = {[],[]}, count = 0}=State) -> + {noreply, State}; +handle_info(timeout, #state{sender = nil, count = C} = State) when C > 0 -> #state{buffer = Q, count = C} = State, - Sender = case queue:out_r(Q) of - {{value, {Dest, Msg}}, Q2} -> - case erlang:send(Dest, Msg, [noconnect, nosuspend]) of - ok -> - nil; - _Else -> - spawn_monitor(erlang, send, [Dest, Msg]) - end; - {empty, Q2} -> - nil - end, - if Sender =:= nil, C > 1 -> - {noreply, State#state{buffer = Q2, count = C-1}, 0}; - true -> - NewState = State#state{buffer = Q2, sender = Sender, count = C-1}, - % When Sender is nil and C-1 == 0 we're reverting to an - % idle state with no outstanding or queued messages. We'll - % use this oppurtunity to hibernate this process and - % run a garbage collection. - case {Sender, C-1} of - {nil, 0} -> - {noreply, NewState, hibernate}; - _ -> - {noreply, NewState, infinity} - end + {{value, {Dest, Msg}}, Q2} = queue:out_r(Q), + NewState = State#state{buffer = Q2, count = C-1}, + case erlang:send(Dest, Msg, [noconnect, nosuspend]) of + ok when C =:= 1 -> + % We just sent the last queued messsage, we'll use this opportunity + % to hibernate the process and run a garbage collection + {noreply, NewState, hibernate}; + ok when C > 1 -> + % Use a zero timeout to recurse into this handler ASAP + {noreply, NewState, 0}; + _Else -> + % We're experiencing delays, keep buffering internally + Sender = spawn_monitor(erlang, send, [Dest, Msg]), + {noreply, NewState#state{sender = Sender}} end; -handle_info(timeout, State) -> - % Waiting on a sender to return - {noreply, State}; handle_info({'DOWN', Ref, _, Pid, _}, #state{sender = {Pid, Ref}} = State) -> {noreply, State#state{sender = nil}, 0}.
