This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch fabric_teardown
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 8ea452bd17fd99af910a449a1ff244a04c9cc455
Author: Robert Newson <[email protected]>
AuthorDate: Tue Aug 22 13:21:40 2023 +0100

    wip
---
 src/fabric/src/fabric_streams.erl | 155 ++++++++++++++++++++++++++++----------
 src/mango/src/mango_httpd.erl     |   2 +
 2 files changed, 116 insertions(+), 41 deletions(-)

diff --git a/src/fabric/src/fabric_streams.erl 
b/src/fabric/src/fabric_streams.erl
index 318824814..278414fee 100644
--- a/src/fabric/src/fabric_streams.erl
+++ b/src/fabric/src/fabric_streams.erl
@@ -17,13 +17,29 @@
     start/3,
     start/4,
     start/5,
-    cleanup/1
+    cleanup/1,
+    enable_watchdog/0,
+    kick_watchdog/0
 ]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
 
--define(WORKER_CLEANER, fabric_worker_cleaner).
+-define(WORKER_WATCHDOG, fabric_worker_watchdog).
+-define(ADD_WORKER, add_worker).
+-define(WATCHDOG_ENABLE, watchdog_enable).
+-define(WATCHDOG_TIMEOUT, watchdog_timeout).
+-define(WATCHDOG_KICK, watchdog_kick).
+
+-define(WATCHDOG_IS_IDLE, St#watchdog_state.idle).
+-define(WATCHDOG_IS_ENABLED, St#watchdog_state.timer /= undefined).
+
+-record(watchdog_state, {
+    coordinator,
+    workers,
+    timer,
+    idle = true
+}).
 
 start(Workers, Keypos) ->
     start(Workers, Keypos, undefined, undefined).
@@ -43,7 +59,7 @@ start(Workers0, Keypos, StartFun, Replacements, RingOpts) ->
         replacements = Replacements,
         ring_opts = RingOpts
     },
-    spawn_worker_cleaner(self(), Workers0),
+    spawn_worker_watchdog(self(), Workers0),
     Timeout = fabric_util:request_timeout(),
     case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
         {ok, #stream_acc{ready = Workers}} ->
@@ -61,13 +77,13 @@ start(Workers0, Keypos, StartFun, Replacements, RingOpts) ->
     end.
 
 cleanup(Workers) ->
-    % Stop the auxiliary cleaner process as we got to the point where cleanup
-    % happesn in the regular fashion so we don't want to send 2x the number 
kill
+    % Stop the auxiliary watchdog process as we got to the point where cleanup
+    % happens in the regular fashion so we don't want to send 2x the number 
kill
     % messages
-    case get(?WORKER_CLEANER) of
-        CleanerPid when is_pid(CleanerPid) ->
-            erase(?WORKER_CLEANER),
-            exit(CleanerPid, kill);
+    case get(?WORKER_WATCHDOG) of
+        WatchdogPid when is_pid(WatchdogPid) ->
+            erase(?WORKER_WATCHDOG),
+            exit(WatchdogPid, kill);
         _ ->
             ok
     end,
@@ -99,7 +115,7 @@ handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
                     FinalWorkers = lists:foldl(
                         fun(Repl, NewWorkers) ->
                             NewWorker = (St#stream_acc.start_fun)(Repl),
-                            add_worker_to_cleaner(self(), NewWorker),
+                            add_worker_to_watchdog(self(), NewWorker),
                             fabric_dict:store(NewWorker, waiting, NewWorkers)
                         end,
                         Workers,
@@ -152,45 +168,102 @@ handle_stream_start({ok, Error}, _, St) when Error == 
ddoc_updated; Error == ins
 handle_stream_start(Else, _, _) ->
     exit({invalid_stream_start, Else}).
 
-% Spawn an auxiliary rexi worker cleaner. This will be used in cases
-% when the coordinator (request) process is forceably killed and doesn't
+% Spawn an auxiliary rexi worker watchdog which triggers cleanup if;
+% * nothing has been streamed for $timeout duration
+% * the coordinator (request) process is forceably killed and doesn't
 % get a chance to process its `after` fabric:clean/1 clause.
-spawn_worker_cleaner(Coordinator, Workers) ->
-    case get(?WORKER_CLEANER) of
+
+% The watchdog is initially disabled. Clients can enable it by calling
+% enable_watchdog/0 from the fabric_streams process. That client is responsible
+% for calling kick_watchdog/0 on activity to prevent the watchdog from acting.
+% Uf reset_watchdog/0 is not called at least once in each watchdog interval,
+% the stream coordinator is killed and the workers are cleaned up.
+spawn_worker_watchdog(Coordinator, Workers) ->
+    case get(?WORKER_WATCHDOG) of
         undefined ->
+            State0 = #watchdog_state{
+                coordinator = Coordinator,
+                workers = Workers
+            },
+            Enabled = get(?WATCHDOG_ENABLE),
             Pid = spawn(fun() ->
                 erlang:monitor(process, Coordinator),
-                cleaner_loop(Coordinator, Workers)
+                State1 =
+                    case Enabled of
+                        true ->
+                            reset_watchdog(State0);
+                        undefined ->
+                            State0
+                    end,
+                watchdog_loop(State1)
             end),
-            put(?WORKER_CLEANER, Pid),
+            couch_log:notice("Spawned watchdog ~p for ~p", [Pid, self()]),
+            put(?WORKER_WATCHDOG, Pid),
             Pid;
-        ExistingCleaner ->
-            ExistingCleaner
+        ExistingWatchdog ->
+            ExistingWatchdog
     end.
 
-cleaner_loop(Pid, Workers) ->
+watchdog_loop(#watchdog_state{} = St) ->
     receive
-        {add_worker, Pid, Worker} ->
-            cleaner_loop(Pid, [Worker | Workers]);
-        {'DOWN', _, _, Pid, _} ->
-            fabric_util:cleanup(Workers)
+        {?ADD_WORKER, Pid, Worker} when Pid == St#watchdog_state.coordinator ->
+            Workers = St#watchdog_state.workers,
+            watchdog_loop(St#watchdog_state{workers = [Worker | Workers]});
+        ?WATCHDOG_KICK when ?WATCHDOG_IS_ENABLED ->
+            watchdog_loop(St#watchdog_state{idle = false});
+        ?WATCHDOG_KICK ->
+            watchdog_loop(St);
+        ?WATCHDOG_TIMEOUT when ?WATCHDOG_IS_ENABLED, ?WATCHDOG_IS_IDLE ->
+            couch_log:warning("watchdog ~p detected idle interval, killing ~p",
+                [self(), St#watchdog_state.coordinator]),
+            exit(St#watchdog_state.coordinator, kill);
+        ?WATCHDOG_TIMEOUT when ?WATCHDOG_IS_ENABLED ->
+            watchdog_loop(reset_watchdog(St));
+        ?WATCHDOG_TIMEOUT ->
+            St;
+        {'DOWN', _, _, Pid, _} when Pid == St#watchdog_state.coordinator ->
+            fabric_util:cleanup(St#watchdog_state.workers)
     end.
 
-add_worker_to_cleaner(CoordinatorPid, Worker) ->
-    case get(?WORKER_CLEANER) of
-        CleanerPid when is_pid(CleanerPid) ->
-            CleanerPid ! {add_worker, CoordinatorPid, Worker};
+add_worker_to_watchdog(CoordinatorPid, Worker) ->
+    send_to_watchdog({?ADD_WORKER, CoordinatorPid, Worker}).
+
+enable_watchdog() ->
+    couch_log:notice("watchdog enabled on ~p", [self()]),
+    put(?WATCHDOG_ENABLE, true).
+
+reset_watchdog(#watchdog_state{} = St) ->
+    case watchdog_timeout() of
+        infinity ->
+            St;
+        Timeout ->
+            TRef = erlang:send_after(Timeout, self(), ?WATCHDOG_TIMEOUT),
+            St#watchdog_state{timer = TRef, idle = true}
+    end.
+
+kick_watchdog() ->
+    send_to_watchdog(?WATCHDOG_KICK).
+
+send_to_watchdog(Msg) ->
+    case get(?WORKER_WATCHDOG) of
+        WatchdogPid when is_pid(WatchdogPid) ->
+            couch_log:notice("Sending ~p to watchdog ~p", [Msg, WatchdogPid]),
+            WatchdogPid ! Msg;
         _ ->
+            couch_log:warning("Failed to send ~p as no watchdog for ~p", [Msg, 
self()]),
             ok
     end.
 
+watchdog_timeout() ->
+    fabric_util:timeout("idle_stream", "60000").
+
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").
 
-worker_cleaner_test_() ->
+worker_watchdog_test_() ->
     {
-        "Fabric spawn_worker_cleaner test",
+        "Fabric spawn_worker_watchdog test",
         {
             setup,
             fun setup/0,
@@ -208,7 +281,7 @@ worker_cleaner_test_() ->
 should_clean_workers() ->
     ?_test(begin
         meck:reset(rexi),
-        erase(?WORKER_CLEANER),
+        erase(?WORKER_WATCHDOG),
         Workers = [
             #shard{node = 'n1', ref = make_ref()},
             #shard{node = 'n2', ref = make_ref()}
@@ -218,11 +291,11 @@ should_clean_workers() ->
                 die -> ok
             end
         end),
-        Cleaner = spawn_worker_cleaner(Coord, Workers),
-        Ref = erlang:monitor(process, Cleaner),
+        Watchdog = spawn_worker_watchdog(Coord, Workers),
+        Ref = erlang:monitor(process, Watchdog),
         Coord ! die,
         receive
-            {'DOWN', Ref, _, Cleaner, _} -> ok
+            {'DOWN', Ref, _, Watchdog, _} -> ok
         end,
         ?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
     end).
@@ -230,7 +303,7 @@ should_clean_workers() ->
 does_not_fire_if_cleanup_called() ->
     ?_test(begin
         meck:reset(rexi),
-        erase(?WORKER_CLEANER),
+        erase(?WORKER_WATCHDOG),
         Workers = [
             #shard{node = 'n1', ref = make_ref()},
             #shard{node = 'n2', ref = make_ref()}
@@ -240,8 +313,8 @@ does_not_fire_if_cleanup_called() ->
                 die -> ok
             end
         end),
-        Cleaner = spawn_worker_cleaner(Coord, Workers),
-        Ref = erlang:monitor(process, Cleaner),
+        Watchdog = spawn_worker_watchdog(Coord, Workers),
+        Ref = erlang:monitor(process, Watchdog),
         cleanup(Workers),
         Coord ! die,
         receive
@@ -255,7 +328,7 @@ does_not_fire_if_cleanup_called() ->
 should_clean_additional_worker_too() ->
     ?_test(begin
         meck:reset(rexi),
-        erase(?WORKER_CLEANER),
+        erase(?WORKER_WATCHDOG),
         Workers = [
             #shard{node = 'n1', ref = make_ref()}
         ],
@@ -264,12 +337,12 @@ should_clean_additional_worker_too() ->
                 die -> ok
             end
         end),
-        Cleaner = spawn_worker_cleaner(Coord, Workers),
-        add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}),
-        Ref = erlang:monitor(process, Cleaner),
+        Watchdog = spawn_worker_watchdog(Coord, Workers),
+        add_worker_to_watchdog(Coord, #shard{node = 'n2', ref = make_ref()}),
+        Ref = erlang:monitor(process, Watchdog),
         Coord ! die,
         receive
-            {'DOWN', Ref, _, Cleaner, _} -> ok
+            {'DOWN', Ref, _, Watchdog, _} -> ok
         end,
         ?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
     end).
diff --git a/src/mango/src/mango_httpd.erl b/src/mango/src/mango_httpd.erl
index bd91652da..0f19f54a8 100644
--- a/src/mango/src/mango_httpd.erl
+++ b/src/mango/src/mango_httpd.erl
@@ -278,6 +278,7 @@ convert_to_design_id(DDocId) ->
     end.
 
 start_find_resp(Req) ->
+    fabric_streams:enable_watchdog(),
     chttpd:start_delayed_json_response(Req, 200, [], "{\"docs\":[").
 
 end_find_resp(Acc0) ->
@@ -310,6 +311,7 @@ handle_doc({add_key, Key, Value}, Acc0) ->
     NewKVs = lists:keystore(Key, 1, KVs, {Key, Value}),
     {ok, Acc0#vacc{kvs = NewKVs}};
 handle_doc({row, Doc}, Acc0) ->
+    fabric_streams:kick_watchdog(),
     #vacc{prepend = Prepend} = Acc0,
     Chunk = [Prepend, ?JSON_ENCODE(Doc)],
     maybe_flush_response(Acc0, Chunk, iolist_size(Chunk)).

Reply via email to