Repository: couchdb-couch Updated Branches: refs/heads/master 7ee0b8161 -> 8bd756e60
Simplify proc manager assignment logic This changes how proc manager handles proc assignment. Instead of doing this in three different places: get_proc call handler, return_proc/2 with maybe_assign_proc and flush_waiters/2, proc manager now just places all the incoming requests in the waiting queue and then flushes it. As a result all the logic kept in one place which makes it more obvious that we are treating proc management as a processing of a single FIFO queue with "soft" and "hard" upper limits. Consequently this is fixing a bug in maybe_assign_proc where it was possible to assign a client a process that wasn't aware of it. COUCHDB-3095 Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/f0b84513 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/f0b84513 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/f0b84513 Branch: refs/heads/master Commit: f0b84513752cad3d7f6f944ff912244be4c08e32 Parents: 7ee0b81 Author: Eric Avdey <[email protected]> Authored: Thu Aug 11 17:45:32 2016 -0300 Committer: Eric Avdey <[email protected]> Committed: Tue Aug 16 12:06:06 2016 -0300 ---------------------------------------------------------------------- src/couch_proc_manager.erl | 151 ++++++++++++++++++++-------------------- 1 file changed, 75 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/f0b84513/src/couch_proc_manager.erl ---------------------------------------------------------------------- diff --git a/src/couch_proc_manager.erl b/src/couch_proc_manager.erl index 33cc1e5..a6790b4 100644 --- a/src/couch_proc_manager.erl +++ b/src/couch_proc_manager.erl @@ -131,35 +131,17 @@ handle_call(get_stale_proc_count, _From, State) -> {reply, ets:select_count(?PROCS, MatchSpec), State}; handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) -> - {ClientPid, _} = From, LangStr = couch_util:get_value(<<"language">>, Props, <<"javascript">>), Lang = couch_util:to_binary(LangStr), - IterFun = fun(Proc, Acc) -> - case lists:member(DDocKey, Proc#proc_int.ddoc_keys) of - true -> - {stop, assign_proc(ClientPid, Proc)}; - false -> - {ok, Acc} - end - end, - TeachFun = fun(Proc0, Acc) -> - try - {ok, Proc1} = teach_ddoc(DDoc, DDocKey, Proc0), - {stop, assign_proc(ClientPid, Proc1)} - catch _:_ -> - {ok, Acc} - end - end, Client = #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey}, - find_proc(State, Client, [IterFun, TeachFun]); + add_waiting_client(Client), + {noreply, flush_waiters(State, Lang)}; -handle_call({get_proc, Lang}, From, State) -> - {ClientPid, _} = From, - IterFun = fun(Proc, _Acc) -> - {stop, assign_proc(ClientPid, Proc)} - end, - Client = #client{from=From, lang=couch_util:to_binary(Lang)}, - find_proc(State, Client, [IterFun]); +handle_call({get_proc, LangStr}, From, State) -> + Lang = couch_util:to_binary(LangStr), + Client = #client{from=From, lang=Lang}, + add_waiting_client(Client), + {noreply, flush_waiters(State, Lang)}; handle_call({ret_proc, #proc{client=Ref} = Proc}, _From, State) -> erlang:demonitor(Ref, [flush]), @@ -290,54 +272,60 @@ handle_config_change(_, _, _, _, _) -> {ok, undefined}. -find_proc(State, Client, [Fun | FindFuns]) -> - try iter_procs(Client#client.lang, Fun, undefined) of - {not_found, _} -> - find_proc(State, Client, FindFuns); - {ok, Proc} -> - {reply, {ok, Proc, State#state.config}, State} +find_proc(#client{lang = Lang, ddoc_key = undefined}) -> + Pred = fun(_) -> + true + end, + find_proc(Lang, Pred); +find_proc(#client{lang = Lang, ddoc = DDoc, ddoc_key = DDocKey} = Client) -> + Pred = fun(#proc_int{ddoc_keys = DDocKeys}) -> + lists:member(DDocKey, DDocKeys) + end, + case find_proc(Lang, Pred) of + not_found -> + case find_proc(Client#client{ddoc_key=undefined}) of + {ok, Proc} -> + teach_ddoc(DDoc, DDocKey, Proc); + Else -> + Else + end; + Else -> + Else + end. + +find_proc(Lang, Fun) -> + try iter_procs(Lang, Fun) catch error:Reason -> - couch_log:error("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]), - {reply, {error, Reason}, State} - end; -find_proc(State, Client, []) -> - {noreply, maybe_spawn_proc(State, Client)}. + StackTrace = erlang:get_stacktrace(), + couch_log:error("~p ~p ~p", [?MODULE, Reason, StackTrace]), + {error, Reason} + end. -iter_procs(Lang, Fun, Acc) when is_binary(Lang) -> +iter_procs(Lang, Fun) when is_binary(Lang) -> Pattern = #proc_int{lang=Lang, client=undefined, _='_'}, MSpec = [{Pattern, [], ['$_']}], case ets:select_reverse(?PROCS, MSpec, 25) of '$end_of_table' -> - {not_found, Acc}; + not_found; Continuation -> - iter_procs_int(Continuation, Fun, Acc) + iter_procs_int(Continuation, Fun) end. -iter_procs_int({[], Continuation0}, Fun, Acc) -> +iter_procs_int({[], Continuation0}, Fun) -> case ets:select_reverse(Continuation0) of '$end_of_table' -> - {not_found, Acc}; + not_found; Continuation1 -> - iter_procs_int(Continuation1, Fun, Acc) + iter_procs_int(Continuation1, Fun) end; -iter_procs_int({[Proc | Rest], Continuation}, Fun, Acc0) -> - case Fun(Proc, Acc0) of - {ok, Acc1} -> - iter_procs_int({Rest, Continuation}, Fun, Acc1); - {stop, Acc1} -> - {ok, Acc1} - end. - - -maybe_spawn_proc(State, Client) -> - case dict:find(Client#client.lang, State#state.counts) of - {ok, Count} when Count >= State#state.hard_limit -> - add_waiting_client(Client), - State; - _ -> - spawn_proc(State, Client) +iter_procs_int({[Proc | Rest], Continuation}, Fun) -> + case Fun(Proc) of + true -> + {ok, Proc}; + false -> + iter_procs_int({Rest, Continuation}, Fun) end. @@ -447,23 +435,13 @@ return_proc(#state{} = State, #proc_int{} = ProcInt) -> true = ets:update_element(?PROCS, Pid, [ {#proc_int.client, undefined} ]), - maybe_assign_proc(State, ProcInt) + State end; false -> remove_proc(State, ProcInt) end, flush_waiters(NewState, Lang). -maybe_assign_proc(#state{} = State, ProcInt) -> - #proc_int{lang = Lang} = ProcInt, - case get_waiting_client(Lang) of - #client{from = From} = Client -> - Proc = assign_proc(Client, ProcInt#proc_int{client=undefined}), - gen_server:reply(From, {ok, Proc, State#state.config}), - State; - undefined -> - State - end. remove_proc(State, #proc_int{}=Proc) -> ets:delete(?PROCS, Proc#proc_int.pid), @@ -500,16 +478,27 @@ flush_waiters(State) -> flush_waiters(State, Lang) -> - case dict:fetch(Lang, State#state.counts) of - Count when Count < State#state.hard_limit -> - case get_waiting_client(Lang) of - #client{} = Client -> + CanSpawn = can_spawn(State, Lang), + case get_waiting_client(Lang) of + #client{from = From} = Client -> + case find_proc(Client) of + {ok, ProcInt} -> + Proc = assign_proc(Client, ProcInt), + gen_server:reply(From, {ok, Proc, State#state.config}), + remove_waiting_client(Client), + flush_waiters(State, Lang); + {error, Error} -> + gen_server:reply(From, {error, Error}), + remove_waiting_client(Client), + flush_waiters(State, Lang); + not_found when CanSpawn -> NewState = spawn_proc(State, Client), + remove_waiting_client(Client), flush_waiters(NewState, Lang); - undefined -> + not_found -> State end; - _ -> + undefined -> State end. @@ -523,11 +512,21 @@ get_waiting_client(Lang) -> '$end_of_table' -> undefined; {[#client{}=Client], _} -> - ets:delete(?WAITERS, Client#client.timestamp), Client end. +remove_waiting_client(#client{timestamp = Timestamp}) -> + ets:delete(?WAITERS, Timestamp). + + +can_spawn(#state{hard_limit = HardLimit, counts = Counts}, Lang) -> + case dict:find(Lang, Counts) of + {ok, Count} -> Count < HardLimit; + error -> true + end. + + get_proc_config() -> Limit = config:get("query_server_config", "reduce_limit", "true"), Timeout = config:get("couchdb", "os_process_timeout", "5000"),
