Rename governor to buffer Buffer describes the behavior quite a bit better than governor.
BugzId: 23717 BugzId: 23718 Project: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/commit/5b589962 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/tree/5b589962 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/diff/5b589962 Branch: refs/heads/windsor-merge Commit: 5b5899627db710cab64d1c724cc4ad4d7447b9d8 Parents: 8f9d160 Author: Paul J. Davis <[email protected]> Authored: Tue Dec 10 09:59:16 2013 -0600 Committer: Robert Newson <[email protected]> Committed: Wed Jul 23 18:04:13 2014 +0100 ---------------------------------------------------------------------- src/rexi_buffer.erl | 96 ++++++++++++++++++++++++++ src/rexi_gov_manager.erl | 157 ------------------------------------------ src/rexi_governor.erl | 96 -------------------------- src/rexi_sup.erl | 8 +-- src/rexi_utils.erl | 2 +- 5 files changed, 101 insertions(+), 258 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/5b589962/src/rexi_buffer.erl ---------------------------------------------------------------------- diff --git a/src/rexi_buffer.erl b/src/rexi_buffer.erl new file mode 100644 index 0000000..b096c5b --- /dev/null +++ b/src/rexi_buffer.erl @@ -0,0 +1,96 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. +-module(rexi_buffer). + +-behaviour(gen_server). +-vsn(1). + +% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export ([ + send/2, + start_link/1 +]). + +-record(state, { + buffer = queue:new(), + sender = nil, + count = 0 +}). + +%% TODO Leverage os_mon to discover available memory in the system +-define (MAX_MEMORY, 17179869184). + +start_link(ServerId) -> + gen_server:start_link({local, ServerId}, ?MODULE, nil, []). + +send(Dest, Msg) -> + Server = list_to_atom(lists:concat([rexi_buffer, "_", get_node(Dest)])), + gen_server:cast(Server, {deliver, Dest, Msg}). + + +init(_) -> + {ok, #state{}}. + +handle_call(get_buffered_count, _From, State) -> + {reply, State#state.count, State, 0}. + +handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) -> + margaret_counter:increment([erlang, rexi, buffered]), + Q2 = queue:in({Dest, Msg}, Q), + case should_drop() of + true -> + {noreply, State#state{buffer = queue:drop(Q2)}, 0}; + false -> + {noreply, State#state{buffer = Q2, count = C+1}, 0} + end. + +handle_info(timeout, #state{sender = nil} = State) -> + #state{buffer = Q, count = C} = State, + Sender = case queue:out_r(Q) of + {{value, {Dest, Msg}}, Q2} -> + case erlang:send(Dest, Msg, [noconnect, nosuspend]) of + ok -> + nil; + _Else -> + spawn_monitor(erlang, send, [Dest, Msg]) + end; + {empty, Q2} -> + nil + end, + if Sender =:= nil, C > 1 -> + {noreply, State#state{buffer = Q2, count = C-1}, 0}; + true -> + {noreply, State#state{buffer = Q2, sender = Sender, count = C-1}} + end; +handle_info(timeout, State) -> + % Waiting on a sender to return + {noreply, State}; + +handle_info({'DOWN', Ref, _, Pid, _}, #state{sender = {Pid, Ref}} = State) -> + {noreply, State#state{sender = nil}, 0}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +should_drop() -> + erlang:memory(total) > ?MAX_MEMORY. + +get_node({_, Node}) when is_atom(Node) -> + Node; +get_node(Pid) when is_pid(Pid) -> + node(Pid). http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/5b589962/src/rexi_gov_manager.erl ---------------------------------------------------------------------- diff --git a/src/rexi_gov_manager.erl b/src/rexi_gov_manager.erl deleted file mode 100644 index 46cbe53..0000000 --- a/src/rexi_gov_manager.erl +++ /dev/null @@ -1,157 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. --module(rexi_gov_manager). - --behaviour(gen_server). --vsn(1). --behaviour(config_listener). - -% API --export([start_link/0, send/2]). - -% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). --export([handle_config_change/5]). - --record(state, {node_timers = ets:new(timers, [set]), - nodeout_timeout = 2000, - pid_spawn_max = 10000}). - - -% API - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -send(Dest, Msg) -> - case erlang:send(Dest, Msg, [noconnect, nosuspend]) of - ok -> ok; - _ -> - % treat nosuspend and noconnect the same - {ok, Governor} = get_governor(get_node(Dest)), - gen_server:cast(Governor, {spawn_and_track, Dest, Msg}) - end. - -get_node({_, Node}) when is_atom(Node) -> - Node; -get_node(Pid) when is_pid(Pid) -> - node(Pid). - -get_governor(Node) -> - case ets:lookup(govs, Node) of - [{Node, Gov}] -> - {ok, Gov}; - [] -> - gen_server:call(?MODULE, {get_governor, Node}) - end. - -% gen_server callbacks - -init([]) -> - ets:new(govs, [named_table, set, {read_concurrency, true}]), - net_kernel:monitor_nodes(true), - NodeOutTimeout = config:get("rexi","nodeout_timeout","500"), - PidSpawnMax = config:get("rexi","pid_spawn_max", "10000"), - State = #state{ - nodeout_timeout = list_to_integer(NodeOutTimeout), - pid_spawn_max = list_to_integer(PidSpawnMax) - }, - config:listen_for_changes(?MODULE, State), - {ok, State}. - -handle_config_change("rexi", "nodeout_timeout", Value, _, State) -> - IntValue = list_to_integer(Value), - %% Setting the timeout is cheap, no need to check if it actually changed - gen_server:call(?MODULE, {set_timeout, IntValue}), - {ok, State#state{nodeout_timeout = IntValue}}; -handle_config_change("rexi", "pid_spawn_max", Value, _, State) -> - IntValue = list_to_integer(Value), - %% Setting the timeout is cheap, no need to check if it actually changed - gen_server:call(?MODULE, {set_spawn_max, IntValue}), - {ok, State#state{pid_spawn_max = IntValue}}; -handle_config_change(_, _, _, _, State) -> - {ok, State}. - -handle_call({set_timeout, TO}, _, #state{nodeout_timeout = Old} = State) -> - {reply, Old, State#state{nodeout_timeout = TO}}; -handle_call({set_spawn_max, Max}, _, #state{pid_spawn_max = Old} = State) -> - {reply, Old, State#state{pid_spawn_max = Max}}; -handle_call({get_governor, Node}, _From, - #state{pid_spawn_max = PidSpawnMax} = State) -> - case ets:lookup(govs, Node) of - [] -> - {ok, Gov} = gen_server:start_link(rexi_governor, [PidSpawnMax], []), - ets:insert(govs, {Node, Gov}); - [{Node, Gov}] -> - Gov - end, - {reply, {ok, Gov}, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({nodeup, Node}, #state{node_timers = Timers, - pid_spawn_max = PidSpawnMax} = State) -> - case ets:lookup(Timers, Node) of - [{Node, TRef}] -> - erlang:cancel_timer(TRef), - ets:delete(Timers, Node); - _ -> - ok - end, - case ets:lookup(govs, Node) of - [{Node, _}] -> - ok; - [] -> - {ok, Gov} = gen_server:start_link(rexi_governor, [PidSpawnMax], []), - ets:insert(govs, {Node, Gov}) - end, - {noreply, State}; - -handle_info({nodedown, Node}, #state{node_timers = Timers, - nodeout_timeout = NodeTimeout} = State) -> - case ets:lookup(Timers, Node) of - [] -> - TRef = erlang:send_after(NodeTimeout, self(), {nodeout, Node}), - ets:insert(Timers, {Node, TRef}), - {noreply, State}; - _ -> - {noreply, State} - end; - -handle_info({nodeout, Node}, #state{node_timers = Timers} = State) -> - % check for race with node up - case ets:member(Timers, Node) of - true -> - ets:delete(Timers, Node), - case ets:lookup(govs, Node) of - [] -> - ok; - [{Node, Governor}] -> - gen_server:cast(Governor, nodeout) - end; - false -> - ok - end, - {noreply, State}; - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -% Internal functions http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/5b589962/src/rexi_governor.erl ---------------------------------------------------------------------- diff --git a/src/rexi_governor.erl b/src/rexi_governor.erl deleted file mode 100644 index 12ec013..0000000 --- a/src/rexi_governor.erl +++ /dev/null @@ -1,96 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. --module(rexi_governor). - --behaviour(gen_server). --vsn(1). - -% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --export ([ - send/2, - start_link/1 -]). - --record(state, { - buffer = queue:new(), - sender = nil, - count = 0 -}). - -%% TODO Leverage os_mon to discover available memory in the system --define (MAX_MEMORY, 17179869184). - -start_link(ServerId) -> - gen_server:start_link({local, ServerId}, ?MODULE, nil, []). - -send(Dest, Msg) -> - Server = list_to_atom(lists:concat([rexi_governor, "_", get_node(Dest)])), - gen_server:cast(Server, {deliver, Dest, Msg}). - - -init(_) -> - {ok, #state{}}. - -handle_call(get_buffered_count, _From, State) -> - {reply, State#state.count, State, 0}. - -handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) -> - margaret_counter:increment([erlang, rexi, buffered]), - Q2 = queue:in({Dest, Msg}, Q), - case should_drop() of - true -> - {noreply, State#state{buffer = queue:drop(Q2)}, 0}; - false -> - {noreply, State#state{buffer = Q2, count = C+1}, 0} - end. - -handle_info(timeout, #state{sender = nil} = State) -> - #state{buffer = Q, count = C} = State, - Sender = case queue:out_r(Q) of - {{value, {Dest, Msg}}, Q2} -> - case erlang:send(Dest, Msg, [noconnect, nosuspend]) of - ok -> - nil; - _Else -> - spawn_monitor(erlang, send, [Dest, Msg]) - end; - {empty, Q2} -> - nil - end, - if Sender =:= nil, C > 1 -> - {noreply, State#state{buffer = Q2, count = C-1}, 0}; - true -> - {noreply, State#state{buffer = Q2, sender = Sender, count = C-1}} - end; -handle_info(timeout, State) -> - % Waiting on a sender to return - {noreply, State}; - -handle_info({'DOWN', Ref, _, Pid, _}, #state{sender = {Pid, Ref}} = State) -> - {noreply, State#state{sender = nil}, 0}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -should_drop() -> - erlang:memory(total) > ?MAX_MEMORY. - -get_node({_, Node}) when is_atom(Node) -> - Node; -get_node(Pid) when is_pid(Pid) -> - node(Pid). http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/5b589962/src/rexi_sup.erl ---------------------------------------------------------------------- diff --git a/src/rexi_sup.erl b/src/rexi_sup.erl index f9e4933..55c4829 100644 --- a/src/rexi_sup.erl +++ b/src/rexi_sup.erl @@ -46,16 +46,16 @@ init([]) -> [rexi_server_mon] }, { - rexi_governor_sup, - {rexi_server_sup, start_link, [rexi_governor_sup]}, + rexi_buffer_sup, + {rexi_server_sup, start_link, [rexi_buffer_sup]}, permanent, 100, supervisor, [rexi_server_sup] }, { - rexi_governor_mon, - {rexi_server_mon, start_link, [rexi_governor]}, + rexi_buffer_mon, + {rexi_server_mon, start_link, [rexi_buffer]}, permanent, 100, worker, http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/5b589962/src/rexi_utils.erl ---------------------------------------------------------------------- diff --git a/src/rexi_utils.erl b/src/rexi_utils.erl index 3c89ca9..e3eaa6f 100644 --- a/src/rexi_utils.erl +++ b/src/rexi_utils.erl @@ -34,7 +34,7 @@ send(Dest, Msg) -> ok; _ -> % treat nosuspend and noconnect the same - rexi_governor:send(Dest, Msg) + rexi_buffer:send(Dest, Msg) end. %% @doc set up the receive loop with an overall timeout
