Rearrange couch_proc_manager for readability. No functional changes.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/a19a8a1f Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/a19a8a1f Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/a19a8a1f Branch: refs/heads/1843-feature-bigcouch Commit: a19a8a1f2b54669ea1482905883ce35db84e6572 Parents: 135dd70 Author: Benjamin Bastian <[email protected]> Authored: Sat Feb 15 23:31:29 2014 -0800 Committer: Benjamin Bastian <[email protected]> Committed: Mon Feb 17 10:45:53 2014 -0800 ---------------------------------------------------------------------- src/couch_proc_manager.erl | 124 ++++++++++++++++++++-------------------- 1 file changed, 62 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a19a8a1f/src/couch_proc_manager.erl ---------------------------------------------------------------------- diff --git a/src/couch_proc_manager.erl b/src/couch_proc_manager.erl index 5dd000b..c8a5b77 100644 --- a/src/couch_proc_manager.erl +++ b/src/couch_proc_manager.erl @@ -201,6 +201,19 @@ handle_config_change("query_server_config", _, _, _, _) -> handle_config_change(_, _, _, _, _) -> {ok, nil}. +find_proc(State, Client, [Fun|FindFuns]) -> + try iter_procs(State#state.tab, Client#client.lang, Fun, nil) of + {not_found, _} -> + find_proc(State, Client, FindFuns); + {ok, Proc} -> + {reply, {ok, Proc, State#state.config}, State} + catch error:Reason -> + ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]), + {reply, {error, Reason}, State} + end; +find_proc(State, Client, []) -> + {noreply, maybe_spawn_proc(State, Client)}. + iter_procs(Tab, Lang, Fun, Acc) when is_list(Lang) -> iter_procs(Tab, list_to_binary(Lang), Fun, Acc); iter_procs(Tab, Lang, Fun, Acc) -> @@ -269,6 +282,36 @@ new_proc_int(From, Lang) when is_list(Lang) -> make_proc(Pid, Lang, couch_os_process) end. +proc_with_ddoc(DDoc, DDocKey, Procs) -> + Filter = fun(#proc{ddoc_keys=Keys}) -> not lists:member(DDocKey, Keys) end, + case lists:dropwhile(Filter, Procs) of + [DDocProc|_] -> + {ok, DDocProc}; + [] -> + teach_any_proc(DDoc, DDocKey, Procs) + end. + +teach_any_proc(DDoc, DDocKey, [Proc|Rest]) -> + try + teach_ddoc(DDoc, DDocKey, Proc) + catch _:_ -> + teach_any_proc(DDoc, DDocKey, Rest) + end; +teach_any_proc(_, _, []) -> + {error, noproc}. + +teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) -> + % send ddoc over the wire + % we only share the rev with the client we know to update code + % but it only keeps the latest copy, per each ddoc, around. + true = couch_query_servers:proc_prompt(Proc, [<<"ddoc">>, <<"new">>, + DDocId, couch_doc:to_json_obj(DDoc, [])]), + % we should remove any other ddocs keys for this docid + % because the query server overwrites without the rev + Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId], + % add ddoc to the proc + {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}. + make_proc(Pid, Lang, Mod) -> Proc = #proc{ lang = Lang, @@ -313,6 +356,25 @@ return_proc(State, #proc{pid=Pid, lang=Lang} = Proc) -> end end. +maybe_spawn_proc(State, Client) -> + #state{proc_counts=Counts, waiting=Waiting} = State, + #client{lang=Lang} = Client, + Limit = list_to_integer(config:get( + "query_server_config", "os_process_limit", "100")), + case dict:find(Lang, Counts) of + {ok, Limit} -> + add_waiting_client(Waiting, Client), + State; + _ -> + spawn_link(?MODULE, new_proc, [Client]), + State#state{ + proc_counts=dict:update_counter(Lang, 1, Counts) + } + end. + +add_waiting_client(Tab, Client) -> + ets:insert(Tab, Client#client{timestamp=now()}). + get_waiting_client(Tab, Lang) when is_list(Lang) -> get_waiting_client(Tab, couch_util:to_binary(Lang)); get_waiting_client(Tab, Lang) -> @@ -324,9 +386,6 @@ get_waiting_client(Tab, Lang) -> Client end. -add_waiting_client(Tab, Client) -> - ets:insert(Tab, Client#client{timestamp=now()}). - get_proc_config() -> Limit = config:get("query_server_config", "reduce_limit", "true"), Timeout = config:get("couchdb", "os_process_timeout", "5000"), @@ -334,62 +393,3 @@ get_proc_config() -> {<<"reduce_limit">>, list_to_atom(Limit)}, {<<"timeout">>, list_to_integer(Timeout)} ]}. - -proc_with_ddoc(DDoc, DDocKey, Procs) -> - Filter = fun(#proc{ddoc_keys=Keys}) -> not lists:member(DDocKey, Keys) end, - case lists:dropwhile(Filter, Procs) of - [DDocProc|_] -> - {ok, DDocProc}; - [] -> - teach_any_proc(DDoc, DDocKey, Procs) - end. - -teach_any_proc(DDoc, DDocKey, [Proc|Rest]) -> - try - teach_ddoc(DDoc, DDocKey, Proc) - catch _:_ -> - teach_any_proc(DDoc, DDocKey, Rest) - end; -teach_any_proc(_, _, []) -> - {error, noproc}. - -teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) -> - % send ddoc over the wire - % we only share the rev with the client we know to update code - % but it only keeps the latest copy, per each ddoc, around. - true = couch_query_servers:proc_prompt(Proc, [<<"ddoc">>, <<"new">>, - DDocId, couch_doc:to_json_obj(DDoc, [])]), - % we should remove any other ddocs keys for this docid - % because the query server overwrites without the rev - Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId], - % add ddoc to the proc - {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}. - -maybe_spawn_proc(State, Client) -> - #state{proc_counts=Counts, waiting=Waiting} = State, - #client{lang=Lang} = Client, - Limit = list_to_integer(config:get( - "query_server_config", "os_process_limit", "100")), - case dict:find(Lang, Counts) of - {ok, Limit} -> - add_waiting_client(Waiting, Client), - State; - _ -> - spawn_link(?MODULE, new_proc, [Client]), - State#state{ - proc_counts=dict:update_counter(Lang, 1, Counts) - } - end. - -find_proc(State, Client, [Fun|FindFuns]) -> - try iter_procs(State#state.tab, Client#client.lang, Fun, nil) of - {not_found, _} -> - find_proc(State, Client, FindFuns); - {ok, Proc} -> - {reply, {ok, Proc, State#state.config}, State} - catch error:Reason -> - ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]), - {reply, {error, Reason}, State} - end; -find_proc(State, Client, []) -> - {noreply, maybe_spawn_proc(State, Client)}.
