This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/main by this push:
     new f9ffcbf3a Add scanner checkpoint and stop. Use it for auto-purge 
plugin.
f9ffcbf3a is described below

commit f9ffcbf3a7ea5634af405855141afe064b38568a
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Wed Jan 14 01:05:07 2026 -0500

    Add scanner checkpoint and stop. Use it for auto-purge plugin.
    
    Plugins already could return a `skip` for `start/2` and `resume/2` and in 
that
    case the plugin run will be skipped.
    
    Add a similar idea to `checkpoint/2`, it can now return `{stop, St}` and in
    that case we'd checkpoint but then stop the plugin.
    
    Use this new feature with auto-purge plugin: stop processing of found any 
dead
    nodes in the cluster. We don't want to keep accumulating purge infos when we
    know for sure some internal replication purge clients won't be able to "see"
    and checkpoint when they are down.
---
 src/couch/src/couch_auto_purge_plugin.erl          | 37 +++++++++++++----
 src/couch_scanner/src/couch_scanner_plugin.erl     | 46 +++++++++++++++-------
 .../test/eunit/couch_scanner_test.erl              | 46 ++++++++++++++++++++--
 3 files changed, 104 insertions(+), 25 deletions(-)

diff --git a/src/couch/src/couch_auto_purge_plugin.erl 
b/src/couch/src/couch_auto_purge_plugin.erl
index 34234377e..e63516d30 100644
--- a/src/couch/src/couch_auto_purge_plugin.erl
+++ b/src/couch/src/couch_auto_purge_plugin.erl
@@ -28,21 +28,39 @@
 -include_lib("stdlib/include/assert.hrl").
 
 start(ScanId, #{}) ->
-    St = init_config(ScanId),
-    ?INFO("Starting.", [], St),
-    {ok, St}.
+    case dead_nodes() of
+        true ->
+            ?INFO("Not starting. Found dead nodes", [], #{sid => ScanId}),
+            skip;
+        false ->
+            St = init_config(ScanId),
+            ?INFO("Starting.", [], St),
+            {ok, St}
+    end.
 
 resume(ScanId, #{}) ->
-    St = init_config(ScanId),
-    ?INFO("Resuming.", [], St),
-    {ok, St}.
+    case dead_nodes() of
+        true ->
+            ?INFO("Not resuming. Found dead nodes", [], #{sid => ScanId}),
+            skip;
+        false ->
+            St = init_config(ScanId),
+            ?INFO("Resuming.", [], St),
+            {ok, St}
+    end.
 
 complete(St) ->
     ?INFO("Completed", [], St),
     {ok, #{}}.
 
-checkpoint(_St) ->
-    {ok, #{}}.
+checkpoint(St) ->
+    case dead_nodes() of
+        true ->
+            ?WARN("Stopping. Found dead nodes", [], meta(St)),
+            {stop, #{}};
+        false ->
+            {ok, #{}}
+    end.
 
 db(St, DbName) ->
     case ttl(St, DbName) of
@@ -213,3 +231,6 @@ min_batch_size() ->
 
 max_batch_size() ->
     erlang:max(min_batch_size(), config:get_integer(atom_to_list(?MODULE), 
"max_batch_size", 500)).
+
+dead_nodes() ->
+    [] =/= (mem3:nodes() -- mem3_util:live_nodes()).
diff --git a/src/couch_scanner/src/couch_scanner_plugin.erl 
b/src/couch_scanner/src/couch_scanner_plugin.erl
index 301f5e6ff..b31ed949f 100644
--- a/src/couch_scanner/src/couch_scanner_plugin.erl
+++ b/src/couch_scanner/src/couch_scanner_plugin.erl
@@ -41,20 +41,27 @@
 % scan starts from the beginning (first db, first shard, ...), and resume/2 is
 % called when the scanning hasn't finished and has to continue.
 %
-% If start/2 or resume/2 returns `reset` then the checkpoint will be reset and
-% the plugin will be restarted. This may be useful in cases when the plugin
-% detects configuration changes since last scanning session had already
-% started, or when the plugin module was updated and the checkpoint version is
-% stale.
-%
 % The checkpoint/1 callback is periodically called to checkpoint the scanning
 % progress. start/2 and resume/2 function will be called with the last saved
 % checkpoint map value.
 %
+% If start/2, resume/2 or checkpoint/1 returns `reset` then the checkpoint will
+% be reset and the plugin will be restarted. This may be useful in cases when
+% the plugin detects configuration changes since last scanning session had
+% already started, or when the plugin module was updated and the checkpoint
+% version is stale.
+%
+% To stop or pause execution of the plugin, start/2 and resume/2 may return
+% `skip` and checkpoint/1 can return `{stop, State}`. Skipping on start or
+% resume counts as a "run" at that time and the plugin is scheduled to run next
+% time according to it schedule. A `{stop, State}` return from a checkpoint
+% will persist the `State` in the checkpoint and next time the plugin will
+% resume from that checkpoint.
+%
 % The complete/1 callback is called when the scan has finished. The complete
-% callback should return final checkpoint map object. The last checkpoint will
-% be written and then it will be passed to the start/2 callback if the plugin
-% runs again.
+% callback should return a final checkpoint map object. The last checkpoint
+% will be written, and then it will be passed to the start/2 callback if the
+% plugin runs again.
 %
 % As the cluster dbs, shards, ddocs and individual docs are discovered during
 % scanning, the appropriate callbacks will be called. Most callbacks, besides
@@ -100,7 +107,7 @@
 
 % Optional
 -callback checkpoint(St :: term()) ->
-    {ok, EJson :: #{}}.
+    {ok | stop, EJson :: #{}} | reset.
 
 -callback db(St :: term(), DbName :: binary()) ->
     {ok | skip | stop, St1 :: term()}.
@@ -444,13 +451,14 @@ rate_limit(#st{rlimiter = RLimiter} = St, Type) ->
 checkpoint(#st{} = St) ->
     #st{
         id = Id,
+        mod = Mod,
         callbacks = Cbks,
         pst = PSt,
         cursor = Cursor,
         start_sec = StartSec,
         scan_id = SId
     } = St,
-    EJsonPSt = checkpoint_callback(Cbks, PSt),
+    {Go, EJsonPSt} = checkpoint_callback(Cbks, PSt),
     EJson = #{
         <<"cursor">> => Cursor,
         <<"pst">> => EJsonPSt,
@@ -459,7 +467,16 @@ checkpoint(#st{} = St) ->
         <<"start_sec">> => StartSec
     },
     ok = couch_scanner_checkpoint:write(Id, EJson),
-    St#st{checkpoint_sec = tsec()}.
+    case Go of
+        ok ->
+            St#st{checkpoint_sec = tsec()};
+        stop ->
+            % Plugin wants to stop. Exit and let it reschedule again for
+            % whatever scheduling setup it's configured with. Next time
+            % it runs, it will resume from the checkpoint it just performed.
+            ReschedTSec = schedule_time(Mod, StartSec, tsec()),
+            exit_resched(ReschedTSec)
+    end.
 
 finalize(#st{} = St) ->
     #st{
@@ -551,7 +568,7 @@ shards_callback(#st{pst = PSt, callbacks = Cbks} = St, 
Shards) ->
 start_checkpoint(Id, #{} = Cbks, StartSec, ScanId, Cur, PSt) when
     is_binary(Id), is_binary(ScanId), is_integer(StartSec)
 ->
-    EJsonPSt = checkpoint_callback(Cbks, PSt),
+    {ok, EJsonPSt} = checkpoint_callback(Cbks, PSt),
     EJson = #{
         <<"cursor">> => Cur,
         <<"pst">> => EJsonPSt,
@@ -564,7 +581,8 @@ start_checkpoint(Id, #{} = Cbks, StartSec, ScanId, Cur, 
PSt) when
 checkpoint_callback(#{} = Cbks, PSt) ->
     #{checkpoint := CheckpointCbk} = Cbks,
     case CheckpointCbk(PSt) of
-        {ok, #{} = EJsonPSt} -> couch_scanner_util:ejson_map(EJsonPSt);
+        {ok, #{} = EJsonPSt} -> {ok, couch_scanner_util:ejson_map(EJsonPSt)};
+        {stop, #{} = EJsonPSt} -> {stop, 
couch_scanner_util:ejson_map(EJsonPSt)};
         reset -> exit_resched(reset)
     end.
 
diff --git a/src/couch_scanner/test/eunit/couch_scanner_test.erl 
b/src/couch_scanner/test/eunit/couch_scanner_test.erl
index 5d6a22f38..b4a56e818 100644
--- a/src/couch_scanner/test/eunit/couch_scanner_test.erl
+++ b/src/couch_scanner/test/eunit/couch_scanner_test.erl
@@ -23,6 +23,8 @@ couch_scanner_test_() ->
         [
             ?TDEF_FE(t_top_level_api),
             ?TDEF_FE(t_start_stop),
+            ?TDEF_FE(t_start_stop_mm_mode, 10),
+            ?TDEF_FE(t_stop_auto_purge_on_dead_nodes, 10),
             ?TDEF_FE(t_run_through_all_callbacks_basic, 10),
             ?TDEF_FE(t_find_reporting_works, 10),
             ?TDEF_FE(t_ddoc_features_works, 20),
@@ -46,13 +48,17 @@ couch_scanner_test_() ->
 -define(FIND_PLUGIN, couch_scanner_plugin_find).
 -define(FEATURES_PLUGIN, couch_scanner_plugin_ddoc_features).
 -define(CONFLICTS_PLUGIN, couch_scanner_plugin_conflict_finder).
+-define(AUTO_PURGE_PLUGIN, couch_auto_purge_plugin).
 
 setup() ->
     {module, _} = code:ensure_loaded(?FIND_PLUGIN),
+    {module, _} = code:ensure_loaded(?AUTO_PURGE_PLUGIN),
     meck:new(?FIND_PLUGIN, [passthrough]),
+    meck:new(?AUTO_PURGE_PLUGIN, [passthrough]),
     meck:new(fabric, [passthrough]),
     meck:new(couch_scanner_server, [passthrough]),
     meck:new(couch_scanner_util, [passthrough]),
+    meck:new(mem3, [passthrough]),
     Ctx = test_util:start_couch([fabric, couch_scanner]),
     % Run with the smallest batch size to exercise the batched
     % ddoc iteration
@@ -100,11 +106,12 @@ setup() ->
     {Ctx, {DbName1, DbName2, DbName3}}.
 
 teardown({Ctx, {DbName1, DbName2, DbName3}}) ->
-    config:delete("couch_scanner", "maintenance_mode", false),
+    config:delete("couchdb", "maintenance_mode", false),
     config_delete_section("couch_scanner"),
     config_delete_section("couch_scanner_plugins"),
     config_delete_section(atom_to_list(?FEATURES_PLUGIN)),
     config_delete_section(atom_to_list(?FIND_PLUGIN)),
+    config_delete_section(atom_to_list(?AUTO_PURGE_PLUGIN)),
     config_delete_section(atom_to_list(?CONFLICTS_PLUGIN)),
     lists:foreach(
         fun(Subsection) ->
@@ -139,6 +146,33 @@ t_start_stop(_) ->
     ?assertEqual(ok, couch_scanner_server:resume()),
     ?assertMatch(#{stopped := false}, couch_scanner:status()).
 
+t_start_stop_mm_mode(_) ->
+    ?assertEqual(ok, couch_scanner:stop()),
+    Plugin = atom_to_list(?FIND_PLUGIN),
+    config:set("couch_scanner_plugins", Plugin, "true", false),
+    meck:expect(?FIND_PLUGIN, shards, fun(_, _) -> timer:sleep(10000) end),
+    config:set("couchdb", "maintenance_mode", "true", true),
+    ?assertEqual(ok, couch_scanner:resume()),
+    #{pids := Pids1, stopped := false} = couch_scanner:status(),
+    ?assertEqual(#{}, Pids1),
+    config:set("couchdb", "maintenance_mode", "false", true),
+    ?assertEqual(ok, couch_scanner:stop()),
+    ?assertEqual(ok, couch_scanner:resume()),
+    #{pids := Pids2, stopped := false} = couch_scanner:status(),
+    ?assertMatch(#{<<"couch_scanner_plugin_find">> := Pid} when is_pid(Pid), 
Pids2),
+    ?assertEqual(ok, couch_scanner:stop()).
+
+t_stop_auto_purge_on_dead_nodes(_) ->
+    meck:reset(couch_scanner_server),
+    meck:reset(couch_scanner_util),
+    meck:expect(mem3, nodes, fun() -> ['[email protected]'] end),
+    Plugin = atom_to_list(?AUTO_PURGE_PLUGIN),
+    config:set("couch_scanner_plugins", Plugin, "true", false),
+    wait_exit(10000),
+    ?assertEqual(1, num_calls(?AUTO_PURGE_PLUGIN, start, 2)),
+    ?assertEqual(0, num_calls(?AUTO_PURGE_PLUGIN, complete, 1)),
+    ?assertEqual(1, log_calls(?AUTO_PURGE_PLUGIN, info)).
+
 t_run_through_all_callbacks_basic({_, {DbName1, DbName2, _}}) ->
     % Run the "find" plugin without any regexes
     meck:reset(couch_scanner_server),
@@ -351,10 +385,16 @@ add_docs(DbName, Docs) ->
     {ok, []} = fabric:update_docs(DbName, Docs, [?REPLICATED_CHANGES, 
?ADMIN_CTX]).
 
 num_calls(Fun, Args) ->
-    meck:num_calls(?FIND_PLUGIN, Fun, Args).
+    num_calls(?FIND_PLUGIN, Fun, Args).
+
+num_calls(Mod, Fun, Args) ->
+    meck:num_calls(Mod, Fun, Args).
 
 log_calls(Level) ->
-    meck:num_calls(couch_scanner_util, log, [Level, ?FIND_PLUGIN, '_', '_', 
'_']).
+    log_calls(?FIND_PLUGIN, Level).
+
+log_calls(Mod, Level) ->
+    meck:num_calls(couch_scanner_util, log, [Level, Mod, '_', '_', '_']).
 
 wait_exit(MSec) ->
     meck:wait(couch_scanner_server, handle_info, [{'EXIT', '_', '_'}, '_'], 
MSec).

Reply via email to