This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch fabric_teardown in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 0c12522c93f4ffdc47f37dbd063b713e9ed637fd Author: Robert Newson <[email protected]> AuthorDate: Mon Aug 14 15:48:43 2023 +0100 Fabric workers should exit if the client exits I enhanced the dev haproxy.cfg so that we can see haproxy log output too (I also capture the x-couch-request-id so we can match it to couch.log output) --- rel/haproxy.cfg | 9 +++++---- src/rexi/src/rexi_server.erl | 47 +++++++++++++++++++++++++++++++++++++------- 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/rel/haproxy.cfg b/rel/haproxy.cfg index 540075761..c374a79e7 100644 --- a/rel/haproxy.cfg +++ b/rel/haproxy.cfg @@ -16,7 +16,7 @@ global defaults mode http - log global + log stdout format rfc5424 daemon monitor-uri /_haproxy_health_check option log-health-checks option httplog @@ -25,9 +25,9 @@ defaults option redispatch retries 4 option http-server-close - timeout client 150000 - timeout server 3600000 - timeout connect 500 + timeout client 60s + timeout server 60s + timeout connect 500ms stats enable stats uri /_haproxy_stats @@ -37,6 +37,7 @@ frontend http-in # This requires HAProxy 1.5.x # bind *:$HAPROXY_PORT bind *:5984 + capture response header X-Couch-Request-Id len 10 default_backend couchdbs backend couchdbs diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl index 52489a9c5..e38c87830 100644 --- a/src/rexi/src/rexi_server.erl +++ b/src/rexi/src/rexi_server.erl @@ -28,6 +28,7 @@ -record(job, { client :: reference(), + client_monitor :: reference(), worker :: reference(), client_pid :: pid(), worker_pid :: pid() @@ -36,6 +37,7 @@ -record(st, { workers = ets:new(workers, [private, {keypos, #job.worker}]), clients = ets:new(clients, [private, {keypos, #job.client}]), + client_monitors = ets:new(client_monitors, [private, {keypos, #job.client_monitor}]), errors = queue:new(), error_limit = 0, error_count = 0 @@ -72,9 +74,11 @@ handle_call(_Request, _From, St) -> handle_cast({doit, From, MFA}, St) -> handle_cast({doit, From, undefined, MFA}, St); handle_cast({doit, {ClientPid, ClientRef} = From, Nonce, MFA}, State) -> + ClientMonitor = monitor(process, ClientPid), {LocalPid, Ref} = spawn_monitor(?MODULE, init_p, [From, MFA, Nonce]), Job = #job{ client = ClientRef, + client_monitor = ClientMonitor, worker = Ref, client_pid = ClientPid, worker_pid = LocalPid @@ -90,14 +94,29 @@ handle_cast(_, St) -> couch_log:notice("rexi_server ignored_cast", []), {noreply, St}. -handle_info({'DOWN', Ref, process, _, normal}, #st{workers = Workers} = St) -> +handle_info( + {'DOWN', Ref, process, _, normal}, #st{workers = Workers, client_monitors = ClientMonitors} = St +) -> case find_worker(Ref, Workers) of - #job{} = Job -> + #job{worker = Ref} = Job -> {noreply, remove_job(Job, St)}; false -> - {noreply, St} + case find_worker(Ref, ClientMonitors) of + #job{client_monitor = Ref} = Job -> + couch_log:notice( + "Removing orphaned fabric worker ~p as client ~p died", + [Job#job.worker_pid, Job#job.client_pid] + ), + kill_worker(Job#job.worker, St), + {noreply, St}; + false -> + {noreply, St} + end end; -handle_info({'DOWN', Ref, process, Pid, Error}, #st{workers = Workers} = St) -> +handle_info( + {'DOWN', Ref, process, Pid, Error}, + #st{workers = Workers, client_monitors = ClientMonitors} = St +) -> case find_worker(Ref, Workers) of #job{worker_pid = Pid, worker = Ref, client_pid = CPid, client = CRef} = Job -> case Error of @@ -110,7 +129,17 @@ handle_info({'DOWN', Ref, process, Pid, Error}, #st{workers = Workers} = St) -> {noreply, remove_job(Job, St)} end; false -> - {noreply, St} + case find_worker(Ref, ClientMonitors) of + #job{client_pid = Pid, client_monitor = Ref} = Job -> + couch_log:notice( + "Removing orphaned fabric worker ~p as client ~p died", + [Job#job.worker_pid, Job#job.client_pid] + ), + kill_worker(Job#job.worker, St), + {noreply, St}; + false -> + {noreply, St} + end end; handle_info(_Info, St) -> {noreply, St}. @@ -186,14 +215,18 @@ clean_stack(S) -> S ). -add_job(Job, #st{workers = Workers, clients = Clients} = State) -> +add_job(Job, #st{workers = Workers, clients = Clients, client_monitors = ClientMonitors} = State) -> ets:insert(Workers, Job), ets:insert(Clients, Job), + ets:insert(ClientMonitors, Job), State. -remove_job(Job, #st{workers = Workers, clients = Clients} = State) -> +remove_job( + Job, #st{workers = Workers, clients = Clients, client_monitors = ClientMonitors} = State +) -> ets:delete_object(Workers, Job), ets:delete_object(Clients, Job), + ets:delete_object(ClientMonitors, Job), State. find_worker(Ref, Tab) ->
