This is an automated email from the ASF dual-hosted git repository. davisp pushed a commit to branch prototype/views in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 5c0b671302d778d870236ce2d6261b8dbf86dad4 Author: Paul J. Davis <[email protected]> AuthorDate: Wed Jul 17 10:41:20 2019 -0500 Simplify couch_views_worker_server This just follows the standard pattern of keeping a pool of workers alive that will accept jobs individually. This avoids all of the oddness in passing jobs around after they've been accepted. I've also renamed it couch_views_server so that the name is a bit less wordy. --- ...ws_worker_server.erl => couch_views_server.erl} | 69 +++++++++++----------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/src/couch_views/src/couch_views_worker_server.erl b/src/couch_views/src/couch_views_server.erl similarity index 60% rename from src/couch_views/src/couch_views_worker_server.erl rename to src/couch_views/src/couch_views_server.erl index 13bd9aa..8ec2425 100644 --- a/src/couch_views/src/couch_views_worker_server.erl +++ b/src/couch_views/src/couch_views_server.erl @@ -10,7 +10,7 @@ % License for the specific language governing permissions and limitations under % the License. --module(couch_views_worker_server). +-module(couch_views_server). -behaviour(gen_server). @@ -39,13 +39,13 @@ start_link() -> init(_) -> + process_flag(trap_exit, true), couch_views_jobs:set_timeout(), State0 = #{ - workers => #{}, - acceptor_pid => undefined + workers => [], + num_workers => num_workers() }, - State = spawn_acceptor(State0), - {ok, State}. + {ok, spawn_workers(State)}. terminate(_, _St) -> @@ -56,53 +56,51 @@ handle_call(Msg, _From, St) -> {stop, {bad_call, Msg}, {bad_call, Msg}, St}. -handle_cast({job, Job, JobData}, State) -> - State1 = start_worker(State, Job, JobData), - State2 = spawn_acceptor(State1), - {noreply, State2}; - handle_cast(Msg, St) -> {stop, {bad_cast, Msg}, St}. -handle_info({'DOWN', _Ref, process, Pid, Reason}, State) -> - LogMsg = "~p : process ~p exited with ~p", - couch_log:error(LogMsg, [?MODULE, Pid, Reason]), - State1 = check_finished_process(State, Pid), - {noreply, State1}; +handle_info({'EXIT', Pid, Reason}, State) -> + #{workers := Workers} = State, + case Workers -- [Pid] of + Workers -> + LogMsg = "~p : unknown process ~p exited with ~p", + couch_log:error(LogMsg, [?MODULE, Pid, Reason]), + {stop, {unknown_pid_exit, Pid}, State}; + NewWorkers -> + if Reason == normal -> ok; true -> + LogMsg = "~p : indexer process ~p exited with ~p", + couch_log:error(LogMsg, [?MODULE, Pid, Reason]), + end, + {noreply, spawn_workers(State#{workers := NewWorkers})} + end; handle_info(Msg, St) -> - couch_log:notice("~s ignoring info ~w", [?MODULE, Msg]), - {noreply, St}. + {stop, {bad_info, Msg}, St}. code_change(_OldVsn, St, _Extra) -> {ok, St}. -start_worker(State, Job, JobData) -> - #{workers := Workers} = State, - {Pid, _Ref} = spawn_monitor(fun () -> couch_views_worker:start(Job, JobData) end), - Workers1 = Workers#{Pid => true}, - State#{workers := Workers1}. - - -spawn_acceptor(State) -> +spawn_workers(State) -> #{ workers := Workers, - acceptor_pid := Pid + num_workers := NumWorkers } = State, - MaxWorkers = config:get_integer("couch_views", "max_workers", ?MAX_WORKERS), - case maps:size(Workers) >= MaxWorkers of - false when not is_pid(Pid) -> - Parent = self(), - {Pid1, _Ref} = spawn_monitor(fun() -> blocking_acceptor(Parent) end), - State#{acceptor_pid := Pid1}; - _ -> + case length(Workers) < NumWorkers of + true -> + Pid = spawn_worker(), + spawn_workers(State#{workers := [Pid | Workers]}); + false -> State end. +spawn_worker() -> + couch_views_indexer:spawn_link(). + + blocking_acceptor(Parent) -> case couch_views_jobs:accept() of not_found -> @@ -120,3 +118,8 @@ check_finished_process(State, Pid) -> #{workers := Workers} = State, Workers1 = maps:remove(Pid, Workers), State#{workers := Workers1}. + + + +num_workers() -> + config:get_integer("couch_views", "max_workers", ?MAX_WORKERS).
