Rejigger the governor implementation Previously we'd spawn a number of processes to send messages to 'noconnect' / 'nosuspend' nodes. Now we're buffering the messages that need to be sent directly in each governor and sending them one at a time. This prevents the net_kernel from tipping over in the noconnect case. We also decide whether to drop messages based on memory consumption in the node instead of process limits (since we're not spawning anymore).
BugzID: 23717 BugzID: 23718 Project: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/commit/1030906b Tree: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/tree/1030906b Diff: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/diff/1030906b Branch: refs/heads/windsor-merge Commit: 1030906b85de7a398baf6363b9d3f6f21e93b257 Parents: 11b3859 Author: Adam Kocoloski <[email protected]> Authored: Tue Oct 15 22:46:52 2013 -0400 Committer: Robert Newson <[email protected]> Committed: Wed Jul 23 18:02:32 2014 +0100 ---------------------------------------------------------------------- src/rexi_governor.erl | 97 ++++++++++++++++++++++++++-------------------- src/rexi_utils.erl | 8 +++- 2 files changed, 62 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/1030906b/src/rexi_governor.erl ---------------------------------------------------------------------- diff --git a/src/rexi_governor.erl b/src/rexi_governor.erl index 876165d..fdf8c93 100644 --- a/src/rexi_governor.erl +++ b/src/rexi_governor.erl @@ -18,54 +18,67 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {pids = ets:new(pids, [set]), - spawn_max = 10000, - spawn_cnt = 0, - drop_cnt = 0}). - -init([PidSpawnMax]) -> - {ok, #state{spawn_max = PidSpawnMax}}. - -handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State}. - -handle_cast({spawn_and_track, Dest, Msg}, - #state{pids = Pids, - spawn_max = SpawnMax, - spawn_cnt = SC, - drop_cnt = DC} = State) -> - {NewSC, NewDC} = - case ets:info(Pids, size) < SpawnMax of - true -> - {Pid, Ref} = spawn_monitor(erlang, send, [Dest, Msg]), - ets:insert(Pids, {Pid, Ref}), - {SC + 1, DC}; - false -> - % drop message on floor - {SC, DC + 1} - end, - {noreply, State#state{spawn_cnt = NewSC, drop_cnt = NewDC}}; +-export ([ + send/2, + start_link/1 +]). + +-record(state, { + buffer = queue:new(), + count = 0 +}). -handle_cast(nodeout, #state{pids = Pids} = State) -> - % kill all the pids - ets:foldl(fun({P, _Ref}, Acc) -> - exit(P, kill), - Acc - end, [], Pids), - ets:delete_all_objects(Pids), - {noreply, State}. +%% TODO Leverage os_mon to discover available memory in the system +-define (MAX_MEMORY, 17179869184). -handle_info({'DOWN', _, process, Pid, normal}, - #state{pids = Pids} = State) -> - ets:delete(Pids, Pid), - {noreply, State}; +start_link(ServerId) -> + gen_server:start_link({local, ServerId}, ?MODULE, nil, []). -handle_info({'DOWN', _, process, _Pid, killed}, State) -> - {noreply, State}. +send(Dest, Msg) -> + Server = list_to_atom(lists:concat([rexi_governor, "_", get_node(Dest)])), + gen_server:cast(Server, {deliver, Dest, Msg}). + + +init(_) -> + {ok, #state{}}. + +handle_call(get_buffered_count, _From, State) -> + {reply, State#state.count, State, 0}. + +handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) -> + margaret_counter:increment([erlang, rexi, buffered]), + Q2 = queue:in({Dest, Msg}, Q), + case should_drop() of + true -> + {noreply, State#state{buffer = queue:drop(Q2)}, 0}; + false -> + {noreply, State#state{buffer = Q2, count = C+1}, 0} + end. + +handle_info(timeout, State) -> + #state{buffer = Q, count = C} = State, + case queue:out_r(Q) of + {{value, {Dest, Msg}}, Q2} -> + erlang:send(Dest, Msg); + {empty, Q2} -> + ok + end, + if C > 1 -> + {noreply, State#state{buffer = Q2, count = C-1}, 0}; + true -> + {noreply, State#state{buffer = Q2, count = 0}} + end. terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. + +should_drop() -> + erlang:memory(total) > ?MAX_MEMORY. + +get_node({_, Node}) when is_atom(Node) -> + Node; +get_node(Pid) when is_pid(Pid) -> + node(Pid). http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/1030906b/src/rexi_utils.erl ---------------------------------------------------------------------- diff --git a/src/rexi_utils.erl b/src/rexi_utils.erl index 6e03757..3c89ca9 100644 --- a/src/rexi_utils.erl +++ b/src/rexi_utils.erl @@ -29,7 +29,13 @@ server_pid(Node) -> %% @doc send a message as quickly as possible send(Dest, Msg) -> - rexi_gov_manager:send(Dest, Msg). + case erlang:send(Dest, Msg, [noconnect, nosuspend]) of + ok -> + ok; + _ -> + % treat nosuspend and noconnect the same + rexi_governor:send(Dest, Msg) + end. %% @doc set up the receive loop with an overall timeout -spec recv([any()], integer(), function(), any(), timeout(), timeout()) ->
