This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch fabric_teardown in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 8ea452bd17fd99af910a449a1ff244a04c9cc455 Author: Robert Newson <[email protected]> AuthorDate: Tue Aug 22 13:21:40 2023 +0100 wip --- src/fabric/src/fabric_streams.erl | 155 ++++++++++++++++++++++++++++---------- src/mango/src/mango_httpd.erl | 2 + 2 files changed, 116 insertions(+), 41 deletions(-) diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl index 318824814..278414fee 100644 --- a/src/fabric/src/fabric_streams.erl +++ b/src/fabric/src/fabric_streams.erl @@ -17,13 +17,29 @@ start/3, start/4, start/5, - cleanup/1 + cleanup/1, + enable_watchdog/0, + kick_watchdog/0 ]). -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). --define(WORKER_CLEANER, fabric_worker_cleaner). +-define(WORKER_WATCHDOG, fabric_worker_watchdog). +-define(ADD_WORKER, add_worker). +-define(WATCHDOG_ENABLE, watchdog_enable). +-define(WATCHDOG_TIMEOUT, watchdog_timeout). +-define(WATCHDOG_KICK, watchdog_kick). + +-define(WATCHDOG_IS_IDLE, St#watchdog_state.idle). +-define(WATCHDOG_IS_ENABLED, St#watchdog_state.timer /= undefined). + +-record(watchdog_state, { + coordinator, + workers, + timer, + idle = true +}). start(Workers, Keypos) -> start(Workers, Keypos, undefined, undefined). @@ -43,7 +59,7 @@ start(Workers0, Keypos, StartFun, Replacements, RingOpts) -> replacements = Replacements, ring_opts = RingOpts }, - spawn_worker_cleaner(self(), Workers0), + spawn_worker_watchdog(self(), Workers0), Timeout = fabric_util:request_timeout(), case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of {ok, #stream_acc{ready = Workers}} -> @@ -61,13 +77,13 @@ start(Workers0, Keypos, StartFun, Replacements, RingOpts) -> end. cleanup(Workers) -> - % Stop the auxiliary cleaner process as we got to the point where cleanup - % happesn in the regular fashion so we don't want to send 2x the number kill + % Stop the auxiliary watchdog process as we got to the point where cleanup + % happens in the regular fashion so we don't want to send 2x the number kill % messages - case get(?WORKER_CLEANER) of - CleanerPid when is_pid(CleanerPid) -> - erase(?WORKER_CLEANER), - exit(CleanerPid, kill); + case get(?WORKER_WATCHDOG) of + WatchdogPid when is_pid(WatchdogPid) -> + erase(?WORKER_WATCHDOG), + exit(WatchdogPid, kill); _ -> ok end, @@ -99,7 +115,7 @@ handle_stream_start({rexi_EXIT, Reason}, Worker, St) -> FinalWorkers = lists:foldl( fun(Repl, NewWorkers) -> NewWorker = (St#stream_acc.start_fun)(Repl), - add_worker_to_cleaner(self(), NewWorker), + add_worker_to_watchdog(self(), NewWorker), fabric_dict:store(NewWorker, waiting, NewWorkers) end, Workers, @@ -152,45 +168,102 @@ handle_stream_start({ok, Error}, _, St) when Error == ddoc_updated; Error == ins handle_stream_start(Else, _, _) -> exit({invalid_stream_start, Else}). -% Spawn an auxiliary rexi worker cleaner. This will be used in cases -% when the coordinator (request) process is forceably killed and doesn't +% Spawn an auxiliary rexi worker watchdog which triggers cleanup if; +% * nothing has been streamed for $timeout duration +% * the coordinator (request) process is forceably killed and doesn't % get a chance to process its `after` fabric:clean/1 clause. -spawn_worker_cleaner(Coordinator, Workers) -> - case get(?WORKER_CLEANER) of + +% The watchdog is initially disabled. Clients can enable it by calling +% enable_watchdog/0 from the fabric_streams process. That client is responsible +% for calling kick_watchdog/0 on activity to prevent the watchdog from acting. +% Uf reset_watchdog/0 is not called at least once in each watchdog interval, +% the stream coordinator is killed and the workers are cleaned up. +spawn_worker_watchdog(Coordinator, Workers) -> + case get(?WORKER_WATCHDOG) of undefined -> + State0 = #watchdog_state{ + coordinator = Coordinator, + workers = Workers + }, + Enabled = get(?WATCHDOG_ENABLE), Pid = spawn(fun() -> erlang:monitor(process, Coordinator), - cleaner_loop(Coordinator, Workers) + State1 = + case Enabled of + true -> + reset_watchdog(State0); + undefined -> + State0 + end, + watchdog_loop(State1) end), - put(?WORKER_CLEANER, Pid), + couch_log:notice("Spawned watchdog ~p for ~p", [Pid, self()]), + put(?WORKER_WATCHDOG, Pid), Pid; - ExistingCleaner -> - ExistingCleaner + ExistingWatchdog -> + ExistingWatchdog end. -cleaner_loop(Pid, Workers) -> +watchdog_loop(#watchdog_state{} = St) -> receive - {add_worker, Pid, Worker} -> - cleaner_loop(Pid, [Worker | Workers]); - {'DOWN', _, _, Pid, _} -> - fabric_util:cleanup(Workers) + {?ADD_WORKER, Pid, Worker} when Pid == St#watchdog_state.coordinator -> + Workers = St#watchdog_state.workers, + watchdog_loop(St#watchdog_state{workers = [Worker | Workers]}); + ?WATCHDOG_KICK when ?WATCHDOG_IS_ENABLED -> + watchdog_loop(St#watchdog_state{idle = false}); + ?WATCHDOG_KICK -> + watchdog_loop(St); + ?WATCHDOG_TIMEOUT when ?WATCHDOG_IS_ENABLED, ?WATCHDOG_IS_IDLE -> + couch_log:warning("watchdog ~p detected idle interval, killing ~p", + [self(), St#watchdog_state.coordinator]), + exit(St#watchdog_state.coordinator, kill); + ?WATCHDOG_TIMEOUT when ?WATCHDOG_IS_ENABLED -> + watchdog_loop(reset_watchdog(St)); + ?WATCHDOG_TIMEOUT -> + St; + {'DOWN', _, _, Pid, _} when Pid == St#watchdog_state.coordinator -> + fabric_util:cleanup(St#watchdog_state.workers) end. -add_worker_to_cleaner(CoordinatorPid, Worker) -> - case get(?WORKER_CLEANER) of - CleanerPid when is_pid(CleanerPid) -> - CleanerPid ! {add_worker, CoordinatorPid, Worker}; +add_worker_to_watchdog(CoordinatorPid, Worker) -> + send_to_watchdog({?ADD_WORKER, CoordinatorPid, Worker}). + +enable_watchdog() -> + couch_log:notice("watchdog enabled on ~p", [self()]), + put(?WATCHDOG_ENABLE, true). + +reset_watchdog(#watchdog_state{} = St) -> + case watchdog_timeout() of + infinity -> + St; + Timeout -> + TRef = erlang:send_after(Timeout, self(), ?WATCHDOG_TIMEOUT), + St#watchdog_state{timer = TRef, idle = true} + end. + +kick_watchdog() -> + send_to_watchdog(?WATCHDOG_KICK). + +send_to_watchdog(Msg) -> + case get(?WORKER_WATCHDOG) of + WatchdogPid when is_pid(WatchdogPid) -> + couch_log:notice("Sending ~p to watchdog ~p", [Msg, WatchdogPid]), + WatchdogPid ! Msg; _ -> + couch_log:warning("Failed to send ~p as no watchdog for ~p", [Msg, self()]), ok end. +watchdog_timeout() -> + fabric_util:timeout("idle_stream", "60000"). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -worker_cleaner_test_() -> +worker_watchdog_test_() -> { - "Fabric spawn_worker_cleaner test", + "Fabric spawn_worker_watchdog test", { setup, fun setup/0, @@ -208,7 +281,7 @@ worker_cleaner_test_() -> should_clean_workers() -> ?_test(begin meck:reset(rexi), - erase(?WORKER_CLEANER), + erase(?WORKER_WATCHDOG), Workers = [ #shard{node = 'n1', ref = make_ref()}, #shard{node = 'n2', ref = make_ref()} @@ -218,11 +291,11 @@ should_clean_workers() -> die -> ok end end), - Cleaner = spawn_worker_cleaner(Coord, Workers), - Ref = erlang:monitor(process, Cleaner), + Watchdog = spawn_worker_watchdog(Coord, Workers), + Ref = erlang:monitor(process, Watchdog), Coord ! die, receive - {'DOWN', Ref, _, Cleaner, _} -> ok + {'DOWN', Ref, _, Watchdog, _} -> ok end, ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)) end). @@ -230,7 +303,7 @@ should_clean_workers() -> does_not_fire_if_cleanup_called() -> ?_test(begin meck:reset(rexi), - erase(?WORKER_CLEANER), + erase(?WORKER_WATCHDOG), Workers = [ #shard{node = 'n1', ref = make_ref()}, #shard{node = 'n2', ref = make_ref()} @@ -240,8 +313,8 @@ does_not_fire_if_cleanup_called() -> die -> ok end end), - Cleaner = spawn_worker_cleaner(Coord, Workers), - Ref = erlang:monitor(process, Cleaner), + Watchdog = spawn_worker_watchdog(Coord, Workers), + Ref = erlang:monitor(process, Watchdog), cleanup(Workers), Coord ! die, receive @@ -255,7 +328,7 @@ does_not_fire_if_cleanup_called() -> should_clean_additional_worker_too() -> ?_test(begin meck:reset(rexi), - erase(?WORKER_CLEANER), + erase(?WORKER_WATCHDOG), Workers = [ #shard{node = 'n1', ref = make_ref()} ], @@ -264,12 +337,12 @@ should_clean_additional_worker_too() -> die -> ok end end), - Cleaner = spawn_worker_cleaner(Coord, Workers), - add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}), - Ref = erlang:monitor(process, Cleaner), + Watchdog = spawn_worker_watchdog(Coord, Workers), + add_worker_to_watchdog(Coord, #shard{node = 'n2', ref = make_ref()}), + Ref = erlang:monitor(process, Watchdog), Coord ! die, receive - {'DOWN', Ref, _, Cleaner, _} -> ok + {'DOWN', Ref, _, Watchdog, _} -> ok end, ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)) end). diff --git a/src/mango/src/mango_httpd.erl b/src/mango/src/mango_httpd.erl index bd91652da..0f19f54a8 100644 --- a/src/mango/src/mango_httpd.erl +++ b/src/mango/src/mango_httpd.erl @@ -278,6 +278,7 @@ convert_to_design_id(DDocId) -> end. start_find_resp(Req) -> + fabric_streams:enable_watchdog(), chttpd:start_delayed_json_response(Req, 200, [], "{\"docs\":["). end_find_resp(Acc0) -> @@ -310,6 +311,7 @@ handle_doc({add_key, Key, Value}, Acc0) -> NewKVs = lists:keystore(Key, 1, KVs, {Key, Value}), {ok, Acc0#vacc{kvs = NewKVs}}; handle_doc({row, Doc}, Acc0) -> + fabric_streams:kick_watchdog(), #vacc{prepend = Prepend} = Acc0, Chunk = [Prepend, ?JSON_ENCODE(Doc)], maybe_flush_response(Acc0, Chunk, iolist_size(Chunk)).
