This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch fabric_teardown
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 501b4382f29a50a43fe13eeac23a85faf3ec8ebc
Author: Robert Newson <[email protected]>
AuthorDate: Tue Aug 22 13:21:40 2023 +0100

    A _find request can run for a very long time (on large databases when the
    selector matches no index) and this continues even if the client 
disconnects.
    
    We want to stop the fabric work when there is no client to receive the
    result. fabric_streams already kills the workers if the coordinating process
    dies but in this circumstance it does not.
    
    this commit enhances (and renames) the existing cleanup process to be a 
watchdog. If
    enabled, the watchdog needs to be kicked regularly (by whatever activity we
    think indicates its worth continuing) or it will terminate the process it is
    watching, and kill the worker processes also.
    
    Currently only mango_httpd:handle_find_req enables the watchdog and it only
    kicks the watchdog when it enqueues a row to be returned (i.e, only on 
selector
    matches).
---
 src/fabric/src/fabric_streams.erl | 182 +++++++++++++++++++++++++++++---------
 src/mango/src/mango_httpd.erl     |   2 +
 2 files changed, 143 insertions(+), 41 deletions(-)

diff --git a/src/fabric/src/fabric_streams.erl 
b/src/fabric/src/fabric_streams.erl
index 318824814..a81c80982 100644
--- a/src/fabric/src/fabric_streams.erl
+++ b/src/fabric/src/fabric_streams.erl
@@ -17,13 +17,30 @@
     start/3,
     start/4,
     start/5,
-    cleanup/1
+    cleanup/1,
+    enable_watchdog/0,
+    kick_watchdog/0
 ]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
 
--define(WORKER_CLEANER, fabric_worker_cleaner).
+-define(WORKER_WATCHDOG, fabric_worker_watchdog).
+-define(ADD_WORKER, add_worker).
+-define(WATCHDOG_ENABLE, watchdog_enable).
+-define(WATCHDOG_TIMEOUT, watchdog_timeout).
+-define(WATCHDOG_KICK, watchdog_kick).
+-define(WATCHDOG_LAST_KICK, watchdog_last_kick).
+
+-define(WATCHDOG_IS_IDLE, St#watchdog_state.idle).
+-define(WATCHDOG_IS_ENABLED, St#watchdog_state.timer /= undefined).
+
+-record(watchdog_state, {
+    coordinator,
+    workers,
+    timer,
+    idle = true
+}).
 
 start(Workers, Keypos) ->
     start(Workers, Keypos, undefined, undefined).
@@ -43,7 +60,7 @@ start(Workers0, Keypos, StartFun, Replacements, RingOpts) ->
         replacements = Replacements,
         ring_opts = RingOpts
     },
-    spawn_worker_cleaner(self(), Workers0),
+    spawn_worker_watchdog(self(), Workers0),
     Timeout = fabric_util:request_timeout(),
     case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
         {ok, #stream_acc{ready = Workers}} ->
@@ -61,13 +78,13 @@ start(Workers0, Keypos, StartFun, Replacements, RingOpts) ->
     end.
 
 cleanup(Workers) ->
-    % Stop the auxiliary cleaner process as we got to the point where cleanup
-    % happesn in the regular fashion so we don't want to send 2x the number 
kill
+    % Stop the auxiliary watchdog process as we got to the point where cleanup
+    % happens in the regular fashion so we don't want to send 2x the number 
kill
     % messages
-    case get(?WORKER_CLEANER) of
-        CleanerPid when is_pid(CleanerPid) ->
-            erase(?WORKER_CLEANER),
-            exit(CleanerPid, kill);
+    case get(?WORKER_WATCHDOG) of
+        WatchdogPid when is_pid(WatchdogPid) ->
+            erase(?WORKER_WATCHDOG),
+            exit(WatchdogPid, kill);
         _ ->
             ok
     end,
@@ -99,7 +116,7 @@ handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
                     FinalWorkers = lists:foldl(
                         fun(Repl, NewWorkers) ->
                             NewWorker = (St#stream_acc.start_fun)(Repl),
-                            add_worker_to_cleaner(self(), NewWorker),
+                            add_worker_to_watchdog(self(), NewWorker),
                             fabric_dict:store(NewWorker, waiting, NewWorkers)
                         end,
                         Workers,
@@ -152,45 +169,128 @@ handle_stream_start({ok, Error}, _, St) when Error == 
ddoc_updated; Error == ins
 handle_stream_start(Else, _, _) ->
     exit({invalid_stream_start, Else}).
 
-% Spawn an auxiliary rexi worker cleaner. This will be used in cases
-% when the coordinator (request) process is forceably killed and doesn't
+% Spawn an auxiliary rexi worker watchdog which triggers cleanup if;
+% * nothing has been streamed for $timeout duration
+% * the coordinator (request) process is forceably killed and doesn't
 % get a chance to process its `after` fabric:clean/1 clause.
-spawn_worker_cleaner(Coordinator, Workers) ->
-    case get(?WORKER_CLEANER) of
+
+% The watchdog is initially disabled. Clients can enable it by calling
+% enable_watchdog/0 before starting a fabric_stream. That client is responsible
+% for calling kick_watchdog/0 on activity to prevent the watchdog from acting.
+% If kick_watchdog/0 is not called at least once in each watchdog interval,
+% the stream coordinator is killed and the workers are cleaned up.
+spawn_worker_watchdog(Coordinator, Workers) ->
+    case get(?WORKER_WATCHDOG) of
         undefined ->
+            State0 = #watchdog_state{
+                coordinator = Coordinator,
+                workers = Workers
+            },
+            Enabled = get(?WATCHDOG_ENABLE),
             Pid = spawn(fun() ->
                 erlang:monitor(process, Coordinator),
-                cleaner_loop(Coordinator, Workers)
+                State1 =
+                    case Enabled of
+                        true ->
+                            reset_watchdog(State0);
+                        undefined ->
+                            State0
+                    end,
+                watchdog_loop(State1)
             end),
-            put(?WORKER_CLEANER, Pid),
+            put(?WORKER_WATCHDOG, Pid),
             Pid;
-        ExistingCleaner ->
-            ExistingCleaner
+        ExistingWatchdog ->
+            ExistingWatchdog
     end.
 
-cleaner_loop(Pid, Workers) ->
+watchdog_loop(#watchdog_state{} = St) ->
     receive
-        {add_worker, Pid, Worker} ->
-            cleaner_loop(Pid, [Worker | Workers]);
-        {'DOWN', _, _, Pid, _} ->
-            fabric_util:cleanup(Workers)
+        {?ADD_WORKER, Pid, Worker} when Pid == St#watchdog_state.coordinator ->
+            Workers = St#watchdog_state.workers,
+            watchdog_loop(St#watchdog_state{workers = [Worker | Workers]});
+        ?WATCHDOG_KICK when ?WATCHDOG_IS_ENABLED ->
+            watchdog_loop(St#watchdog_state{idle = false});
+        ?WATCHDOG_KICK ->
+            watchdog_loop(St);
+        ?WATCHDOG_TIMEOUT when ?WATCHDOG_IS_ENABLED, ?WATCHDOG_IS_IDLE ->
+            couch_log:warning(
+                "watchdog ~p detected idle interval, killing ~p",
+                [self(), St#watchdog_state.coordinator]
+            ),
+            exit(St#watchdog_state.coordinator, kill),
+            fabric_util:cleanup(St#watchdog_state.workers);
+        ?WATCHDOG_TIMEOUT when ?WATCHDOG_IS_ENABLED ->
+            watchdog_loop(reset_watchdog(St));
+        ?WATCHDOG_TIMEOUT ->
+            St;
+        {'DOWN', _, _, Pid, _} when Pid == St#watchdog_state.coordinator ->
+            fabric_util:cleanup(St#watchdog_state.workers)
+    end.
+
+add_worker_to_watchdog(CoordinatorPid, Worker) ->
+    send_to_watchdog({?ADD_WORKER, CoordinatorPid, Worker}).
+
+enable_watchdog() ->
+    put(?WATCHDOG_ENABLE, true).
+
+reset_watchdog(#watchdog_state{} = St) ->
+    case watchdog_timeout() of
+        infinity ->
+            St;
+        Timeout ->
+            TRef = erlang:send_after(Timeout, self(), ?WATCHDOG_TIMEOUT),
+            St#watchdog_state{timer = TRef, idle = true}
+    end.
+
+kick_watchdog() ->
+    case should_kick() of
+        true ->
+            send_to_watchdog(?WATCHDOG_KICK),
+            update_last_kick();
+        false ->
+            ok
     end.
 
-add_worker_to_cleaner(CoordinatorPid, Worker) ->
-    case get(?WORKER_CLEANER) of
-        CleanerPid when is_pid(CleanerPid) ->
-            CleanerPid ! {add_worker, CoordinatorPid, Worker};
+
+should_kick() ->
+    case watchdog_timeout() of
+        infinity ->
+            false;
+        Timeout when is_integer(Timeout) ->
+            case get(?WATCHDOG_LAST_KICK) of
+                undefined ->
+                    true;
+                LastKick when is_integer(LastKick) ->
+                    Now = erlang:monotonic_time(),
+                    Delta = erlang:convert_time_unit(
+                        Now - LastKick, native, millisecond
+                    ),
+                    Delta > Timeout
+            end
+    end.
+
+update_last_kick() ->
+    put(?WATCHDOG_LAST_KICK, erlang:monotonic_time()).
+
+send_to_watchdog(Msg) ->
+    case get(?WORKER_WATCHDOG) of
+        WatchdogPid when is_pid(WatchdogPid) ->
+            WatchdogPid ! Msg;
         _ ->
             ok
     end.
 
+watchdog_timeout() ->
+    fabric_util:timeout("idle_stream", "60000").
+
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").
 
-worker_cleaner_test_() ->
+worker_watchdog_test_() ->
     {
-        "Fabric spawn_worker_cleaner test",
+        "Fabric spawn_worker_watchdog test",
         {
             setup,
             fun setup/0,
@@ -208,7 +308,7 @@ worker_cleaner_test_() ->
 should_clean_workers() ->
     ?_test(begin
         meck:reset(rexi),
-        erase(?WORKER_CLEANER),
+        erase(?WORKER_WATCHDOG),
         Workers = [
             #shard{node = 'n1', ref = make_ref()},
             #shard{node = 'n2', ref = make_ref()}
@@ -218,11 +318,11 @@ should_clean_workers() ->
                 die -> ok
             end
         end),
-        Cleaner = spawn_worker_cleaner(Coord, Workers),
-        Ref = erlang:monitor(process, Cleaner),
+        Watchdog = spawn_worker_watchdog(Coord, Workers),
+        Ref = erlang:monitor(process, Watchdog),
         Coord ! die,
         receive
-            {'DOWN', Ref, _, Cleaner, _} -> ok
+            {'DOWN', Ref, _, Watchdog, _} -> ok
         end,
         ?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
     end).
@@ -230,7 +330,7 @@ should_clean_workers() ->
 does_not_fire_if_cleanup_called() ->
     ?_test(begin
         meck:reset(rexi),
-        erase(?WORKER_CLEANER),
+        erase(?WORKER_WATCHDOG),
         Workers = [
             #shard{node = 'n1', ref = make_ref()},
             #shard{node = 'n2', ref = make_ref()}
@@ -240,8 +340,8 @@ does_not_fire_if_cleanup_called() ->
                 die -> ok
             end
         end),
-        Cleaner = spawn_worker_cleaner(Coord, Workers),
-        Ref = erlang:monitor(process, Cleaner),
+        Watchdog = spawn_worker_watchdog(Coord, Workers),
+        Ref = erlang:monitor(process, Watchdog),
         cleanup(Workers),
         Coord ! die,
         receive
@@ -255,7 +355,7 @@ does_not_fire_if_cleanup_called() ->
 should_clean_additional_worker_too() ->
     ?_test(begin
         meck:reset(rexi),
-        erase(?WORKER_CLEANER),
+        erase(?WORKER_WATCHDOG),
         Workers = [
             #shard{node = 'n1', ref = make_ref()}
         ],
@@ -264,12 +364,12 @@ should_clean_additional_worker_too() ->
                 die -> ok
             end
         end),
-        Cleaner = spawn_worker_cleaner(Coord, Workers),
-        add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}),
-        Ref = erlang:monitor(process, Cleaner),
+        Watchdog = spawn_worker_watchdog(Coord, Workers),
+        add_worker_to_watchdog(Coord, #shard{node = 'n2', ref = make_ref()}),
+        Ref = erlang:monitor(process, Watchdog),
         Coord ! die,
         receive
-            {'DOWN', Ref, _, Cleaner, _} -> ok
+            {'DOWN', Ref, _, Watchdog, _} -> ok
         end,
         ?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
     end).
diff --git a/src/mango/src/mango_httpd.erl b/src/mango/src/mango_httpd.erl
index bd91652da..0f19f54a8 100644
--- a/src/mango/src/mango_httpd.erl
+++ b/src/mango/src/mango_httpd.erl
@@ -278,6 +278,7 @@ convert_to_design_id(DDocId) ->
     end.
 
 start_find_resp(Req) ->
+    fabric_streams:enable_watchdog(),
     chttpd:start_delayed_json_response(Req, 200, [], "{\"docs\":[").
 
 end_find_resp(Acc0) ->
@@ -310,6 +311,7 @@ handle_doc({add_key, Key, Value}, Acc0) ->
     NewKVs = lists:keystore(Key, 1, KVs, {Key, Value}),
     {ok, Acc0#vacc{kvs = NewKVs}};
 handle_doc({row, Doc}, Acc0) ->
+    fabric_streams:kick_watchdog(),
     #vacc{prepend = Prepend} = Acc0,
     Chunk = [Prepend, ?JSON_ENCODE(Doc)],
     maybe_flush_response(Acc0, Chunk, iolist_size(Chunk)).

Reply via email to