Add an index keyed on client reference This prevents table scans triggered by the kill handler from causing the server to fall over under very high throughput.
I also took the opportunity to refactor a bit and use #job records throughout the server instead of raw tuples. BugzID: 12344 Project: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/commit/9b6a5cf1 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/tree/9b6a5cf1 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/diff/9b6a5cf1 Branch: refs/heads/import Commit: 9b6a5cf123faaedbc6b5e3d756a933bc7ee36a74 Parents: 9fd3722 Author: Adam Kocoloski <[email protected]> Authored: Mon Jun 27 15:16:50 2011 -0400 Committer: Adam Kocoloski <[email protected]> Committed: Mon Jun 27 15:16:50 2011 -0400 ---------------------------------------------------------------------- src/rexi_server.erl | 66 ++++++++++++++++++++++++++++++------------------ 1 file changed, 42 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/9b6a5cf1/src/rexi_server.erl ---------------------------------------------------------------------- diff --git a/src/rexi_server.erl b/src/rexi_server.erl index d230533..931e340 100644 --- a/src/rexi_server.erl +++ b/src/rexi_server.erl @@ -22,8 +22,15 @@ -include("rexi.hrl"). -include_lib("eunit/include/eunit.hrl"). +-record(job, { + client::reference(), + worker::reference(), + pid::pid() +}). + -record(st, { - workers = ets:new(workers, [private, {keypos,2}]), + workers = ets:new(workers, [private, {keypos, #job.worker}]), + clients = ets:new(clients, [private, {keypos, #job.client}]), errors = queue:new(), error_limit = 20, error_count = 0 @@ -61,17 +68,18 @@ handle_call(_Request, _From, St) -> handle_cast({doit, From, MFA}, St) -> handle_cast({doit, From, undefined, MFA}, St); -handle_cast({doit, From, Nonce, MFA}, #st{workers=Workers} = St) -> +handle_cast({doit, From, Nonce, MFA}, State) -> {LocalPid, Ref} = spawn_monitor(?MODULE, init_p, [From, MFA, Nonce]), - {noreply, St#st{workers = add_worker({LocalPid, Ref, From}, Workers)}}; + Job = #job{client = element(2, From), worker = Ref, pid = LocalPid}, + {noreply, add_job(Job, State)}; -handle_cast({kill, FromRef}, #st{workers=Workers} = St) -> - case find_worker_from(FromRef, Workers) of - {Pid, KeyRef, {_, FromRef}} -> +handle_cast({kill, FromRef}, #st{clients = Clients} = St) -> + case find_worker(FromRef, Clients) of + #job{worker = KeyRef, pid = Pid} = Job -> erlang:demonitor(KeyRef), exit(Pid, kill), - {noreply, St#st{workers = remove_worker(KeyRef, Workers)}}; + {noreply, remove_job(Job, St)}; false -> {noreply, St} end; @@ -81,18 +89,23 @@ handle_cast(_, St) -> {noreply, St}. handle_info({'DOWN', Ref, process, _, normal}, #st{workers=Workers} = St) -> - {noreply, St#st{workers = remove_worker(Ref, Workers)}}; + case find_worker(Ref, Workers) of + #job{} = Job -> + {noreply, remove_job(Job, St)}; + false -> + {noreply, St} + end; handle_info({'DOWN', Ref, process, Pid, Error}, #st{workers=Workers} = St) -> case find_worker(Ref, Workers) of - {Pid, Ref, From} -> + #job{pid=Pid, worker=Ref, client=From} = Job -> case Error of #error{reason = {_Class, Reason}, stack = Stack} -> notify_caller(From, {Reason, Stack}), St1 = save_error(Error, St), - {noreply, St1#st{workers = remove_worker(Ref, Workers)}}; + {noreply, remove_job(Job, St1)}; _ -> notify_caller(From, Error), - {noreply, St#st{workers = remove_worker(Ref, Workers)}} + {noreply, remove_job(Job, St)} end; false -> {noreply, St} @@ -102,12 +115,21 @@ handle_info(_Info, St) -> {noreply, St}. terminate(_Reason, St) -> - ets:foldl(fun({Pid, _, _}, _) -> exit(Pid,kill) end, nil, St#st.workers), + ets:foldl(fun(#job{pid=Pid},_) -> exit(Pid,kill) end, nil, St#st.workers), ok. code_change(_OldVsn, {st, Workers}, _Extra) -> {ok, #st{workers = Workers}}; +code_change(_OldVsn, {st, Workers0, Errors, Limit, Count}, _Extra) -> + Jobs = [#job{pid=A, worker=B, client=C} || {A, B, {_, C}} + <- ets:tab2list(Workers0)], + ets:delete(Workers0), + State = #st{errors = Errors, error_limit = Limit, error_count = Count}, + ets:insert(State#st.workers, Jobs), + ets:insert(State#st.clients, Jobs), + {ok, State}; + code_change(_OldVsn, St, _Extra) -> {ok, St}. @@ -144,22 +166,18 @@ clean_stack() -> lists:map(fun({M,F,A}) when is_list(A) -> {M,F,length(A)}; (X) -> X end, erlang:get_stacktrace()). -add_worker(Worker, Tab) -> - ets:insert(Tab, Worker), Tab. +add_job(Job, #st{workers = Workers, clients = Clients} = State) -> + ets:insert(Workers, Job), + ets:insert(Clients, Job), + State. -remove_worker(Ref, Tab) -> - ets:delete(Tab, Ref), Tab. +remove_job(Job, #st{workers = Workers, clients = Clients} = State) -> + ets:delete_object(Workers, Job), + ets:delete_object(Clients, Job), + State. find_worker(Ref, Tab) -> case ets:lookup(Tab, Ref) of [] -> false; [Worker] -> Worker end. -find_worker_from(Ref, Tab) -> - case ets:match_object(Tab, {'_', '_', {'_', Ref}}) of - [] -> - false; - [Worker] -> - Worker - end. - notify_caller({Caller, Ref}, Reason) -> Caller ! {Ref, {rexi_EXIT, Reason}}.
