Configure buffer limit by message count

This allows an operator to decide how large the buffers should be. It
also provides an escape valve to clear the buffer entirely.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/commit/cd07cb8c
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/tree/cd07cb8c
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/diff/cd07cb8c

Branch: refs/heads/windsor-merge
Commit: cd07cb8c04df1510253718df7f63d6783e3ec0a7
Parents: 23cda37
Author: Adam Kocoloski <[email protected]>
Authored: Tue Jun 3 14:25:59 2014 -0400
Committer: Robert Newson <[email protected]>
Committed: Wed Jul 23 18:08:01 2014 +0100

----------------------------------------------------------------------
 src/rexi_buffer.erl | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/cd07cb8c/src/rexi_buffer.erl
----------------------------------------------------------------------
diff --git a/src/rexi_buffer.erl b/src/rexi_buffer.erl
index 874ec3c..f75399c 100644
--- a/src/rexi_buffer.erl
+++ b/src/rexi_buffer.erl
@@ -29,9 +29,6 @@
     count = 0
 }).
 
-%% TODO Leverage os_mon to discover available memory in the system
--define (MAX_MEMORY, 17179869184).
-
 start_link(ServerId) ->
     gen_server:start_link({local, ServerId}, ?MODULE, nil, []).
 
@@ -41,7 +38,12 @@ send(Dest, Msg) ->
 
 
 init(_) ->
-    {ok, #state{}}.
+    %% TODO Leverage os_mon to discover available memory in the system
+    Max = list_to_integer(config:get("rexi", "buffer_count", "2000")),
+    {ok, #state{max_count = Max}}.
+
+handle_call(erase_buffer, _From, State) ->
+    {reply, ok, State#state{buffer = queue:new(), count = 0}, 0};
 
 handle_call(get_buffered_count, _From, State) ->
     {reply, State#state.count, State, 0}.
@@ -49,7 +51,7 @@ handle_call(get_buffered_count, _From, State) ->
 handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) ->
     margaret_counter:increment([erlang, rexi, buffered]),
     Q2 = queue:in({Dest, Msg}, Q),
-    case should_drop() of
+    case should_drop(State) of
     true ->
             {noreply, State#state{buffer = queue:drop(Q2)}, 0};
     false ->
@@ -94,11 +96,14 @@ handle_info({'DOWN', Ref, _, Pid, _}, #state{sender = {Pid, 
Ref}} = State) ->
 terminate(_Reason, _State) ->
     ok.
 
+code_change(_OldVsn, {state, Buffer, Sender, Count}, _Extra) ->
+    Max = list_to_integer(config:get("rexi", "buffer_count", "2000")),
+    {ok, #state{buffer=Buffer, sender=Sender, count=Count, max_count=Max}};
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-should_drop() ->
-    erlang:memory(total) > ?MAX_MEMORY.
+should_drop(#state{count = Count, max_count = Max}) ->
+    Count >= Max.
 
 get_node({_, Node}) when is_atom(Node) ->
     Node;

Reply via email to