This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch fabric_teardown
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 0c12522c93f4ffdc47f37dbd063b713e9ed637fd
Author: Robert Newson <[email protected]>
AuthorDate: Mon Aug 14 15:48:43 2023 +0100

    Fabric workers should exit if the client exits
    
    I enhanced the dev haproxy.cfg so that we can see haproxy
    log output too (I also capture the x-couch-request-id so we can
    match it to couch.log output)
---
 rel/haproxy.cfg              |  9 +++++----
 src/rexi/src/rexi_server.erl | 47 +++++++++++++++++++++++++++++++++++++-------
 2 files changed, 45 insertions(+), 11 deletions(-)

diff --git a/rel/haproxy.cfg b/rel/haproxy.cfg
index 540075761..c374a79e7 100644
--- a/rel/haproxy.cfg
+++ b/rel/haproxy.cfg
@@ -16,7 +16,7 @@ global
 
 defaults
         mode http
-        log global
+        log stdout format rfc5424 daemon
         monitor-uri /_haproxy_health_check
         option log-health-checks
         option httplog
@@ -25,9 +25,9 @@ defaults
         option redispatch
         retries 4
         option http-server-close
-        timeout client 150000
-        timeout server 3600000
-        timeout connect 500
+        timeout client 60s
+        timeout server 60s
+        timeout connect 500ms
 
         stats enable
         stats uri /_haproxy_stats
@@ -37,6 +37,7 @@ frontend http-in
          # This requires HAProxy 1.5.x
          # bind *:$HAPROXY_PORT
          bind *:5984
+         capture response header X-Couch-Request-Id len 10
          default_backend couchdbs
 
 backend couchdbs
diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl
index 52489a9c5..e38c87830 100644
--- a/src/rexi/src/rexi_server.erl
+++ b/src/rexi/src/rexi_server.erl
@@ -28,6 +28,7 @@
 
 -record(job, {
     client :: reference(),
+    client_monitor :: reference(),
     worker :: reference(),
     client_pid :: pid(),
     worker_pid :: pid()
@@ -36,6 +37,7 @@
 -record(st, {
     workers = ets:new(workers, [private, {keypos, #job.worker}]),
     clients = ets:new(clients, [private, {keypos, #job.client}]),
+    client_monitors = ets:new(client_monitors, [private, {keypos, 
#job.client_monitor}]),
     errors = queue:new(),
     error_limit = 0,
     error_count = 0
@@ -72,9 +74,11 @@ handle_call(_Request, _From, St) ->
 handle_cast({doit, From, MFA}, St) ->
     handle_cast({doit, From, undefined, MFA}, St);
 handle_cast({doit, {ClientPid, ClientRef} = From, Nonce, MFA}, State) ->
+    ClientMonitor = monitor(process, ClientPid),
     {LocalPid, Ref} = spawn_monitor(?MODULE, init_p, [From, MFA, Nonce]),
     Job = #job{
         client = ClientRef,
+        client_monitor = ClientMonitor,
         worker = Ref,
         client_pid = ClientPid,
         worker_pid = LocalPid
@@ -90,14 +94,29 @@ handle_cast(_, St) ->
     couch_log:notice("rexi_server ignored_cast", []),
     {noreply, St}.
 
-handle_info({'DOWN', Ref, process, _, normal}, #st{workers = Workers} = St) ->
+handle_info(
+    {'DOWN', Ref, process, _, normal}, #st{workers = Workers, client_monitors 
= ClientMonitors} = St
+) ->
     case find_worker(Ref, Workers) of
-        #job{} = Job ->
+        #job{worker = Ref} = Job ->
             {noreply, remove_job(Job, St)};
         false ->
-            {noreply, St}
+            case find_worker(Ref, ClientMonitors) of
+                #job{client_monitor = Ref} = Job ->
+                    couch_log:notice(
+                        "Removing orphaned fabric worker ~p as client ~p died",
+                        [Job#job.worker_pid, Job#job.client_pid]
+                    ),
+                    kill_worker(Job#job.worker, St),
+                    {noreply, St};
+                false ->
+                    {noreply, St}
+            end
     end;
-handle_info({'DOWN', Ref, process, Pid, Error}, #st{workers = Workers} = St) ->
+handle_info(
+    {'DOWN', Ref, process, Pid, Error},
+    #st{workers = Workers, client_monitors = ClientMonitors} = St
+) ->
     case find_worker(Ref, Workers) of
         #job{worker_pid = Pid, worker = Ref, client_pid = CPid, client = CRef} 
= Job ->
             case Error of
@@ -110,7 +129,17 @@ handle_info({'DOWN', Ref, process, Pid, Error}, 
#st{workers = Workers} = St) ->
                     {noreply, remove_job(Job, St)}
             end;
         false ->
-            {noreply, St}
+            case find_worker(Ref, ClientMonitors) of
+                #job{client_pid = Pid, client_monitor = Ref} = Job ->
+                    couch_log:notice(
+                        "Removing orphaned fabric worker ~p as client ~p died",
+                        [Job#job.worker_pid, Job#job.client_pid]
+                    ),
+                    kill_worker(Job#job.worker, St),
+                    {noreply, St};
+                false ->
+                    {noreply, St}
+            end
     end;
 handle_info(_Info, St) ->
     {noreply, St}.
@@ -186,14 +215,18 @@ clean_stack(S) ->
         S
     ).
 
-add_job(Job, #st{workers = Workers, clients = Clients} = State) ->
+add_job(Job, #st{workers = Workers, clients = Clients, client_monitors = 
ClientMonitors} = State) ->
     ets:insert(Workers, Job),
     ets:insert(Clients, Job),
+    ets:insert(ClientMonitors, Job),
     State.
 
-remove_job(Job, #st{workers = Workers, clients = Clients} = State) ->
+remove_job(
+    Job, #st{workers = Workers, clients = Clients, client_monitors = 
ClientMonitors} = State
+) ->
     ets:delete_object(Workers, Job),
     ets:delete_object(Clients, Job),
+    ets:delete_object(ClientMonitors, Job),
     State.
 
 find_worker(Ref, Tab) ->

Reply via email to