Configure buffer limit by message count This allows an operator to decide how large the buffers should be. It also provides an escape valve to clear the buffer entirely.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/commit/cd07cb8c Tree: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/tree/cd07cb8c Diff: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/diff/cd07cb8c Branch: refs/heads/windsor-merge Commit: cd07cb8c04df1510253718df7f63d6783e3ec0a7 Parents: 23cda37 Author: Adam Kocoloski <[email protected]> Authored: Tue Jun 3 14:25:59 2014 -0400 Committer: Robert Newson <[email protected]> Committed: Wed Jul 23 18:08:01 2014 +0100 ---------------------------------------------------------------------- src/rexi_buffer.erl | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/cd07cb8c/src/rexi_buffer.erl ---------------------------------------------------------------------- diff --git a/src/rexi_buffer.erl b/src/rexi_buffer.erl index 874ec3c..f75399c 100644 --- a/src/rexi_buffer.erl +++ b/src/rexi_buffer.erl @@ -29,9 +29,6 @@ count = 0 }). -%% TODO Leverage os_mon to discover available memory in the system --define (MAX_MEMORY, 17179869184). - start_link(ServerId) -> gen_server:start_link({local, ServerId}, ?MODULE, nil, []). @@ -41,7 +38,12 @@ send(Dest, Msg) -> init(_) -> - {ok, #state{}}. + %% TODO Leverage os_mon to discover available memory in the system + Max = list_to_integer(config:get("rexi", "buffer_count", "2000")), + {ok, #state{max_count = Max}}. + +handle_call(erase_buffer, _From, State) -> + {reply, ok, State#state{buffer = queue:new(), count = 0}, 0}; handle_call(get_buffered_count, _From, State) -> {reply, State#state.count, State, 0}. @@ -49,7 +51,7 @@ handle_call(get_buffered_count, _From, State) -> handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) -> margaret_counter:increment([erlang, rexi, buffered]), Q2 = queue:in({Dest, Msg}, Q), - case should_drop() of + case should_drop(State) of true -> {noreply, State#state{buffer = queue:drop(Q2)}, 0}; false -> @@ -94,11 +96,14 @@ handle_info({'DOWN', Ref, _, Pid, _}, #state{sender = {Pid, Ref}} = State) -> terminate(_Reason, _State) -> ok. +code_change(_OldVsn, {state, Buffer, Sender, Count}, _Extra) -> + Max = list_to_integer(config:get("rexi", "buffer_count", "2000")), + {ok, #state{buffer=Buffer, sender=Sender, count=Count, max_count=Max}}; code_change(_OldVsn, State, _Extra) -> {ok, State}. -should_drop() -> - erlang:memory(total) > ?MAX_MEMORY. +should_drop(#state{count = Count, max_count = Max}) -> + Count >= Max. get_node({_, Node}) when is_atom(Node) -> Node;
