This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/main by this push:
     new 747e9deb8 Use W=N for auto-purge plugin
747e9deb8 is described below

commit 747e9deb80cdfe992278353017f298219712a3d9
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Mon Oct 27 16:31:19 2025 -0400

    Use W=N for auto-purge plugin
    
    The idea is we would like to do as much work in the auto-purge request 
itself
    and leave as little as possible for the internal replicator.
    
    While at it, strengthen some state map matches by asserting that we expect 
the
    fields to always by there. Also, ensure to always reset queue to empty on
    flush, even on error and assert it as such as when we're opening closing db
    shard.
    
    The assert also revealed a bug in db_closing where the updated stated from 
the final
    flush wasn't retain and instead we returned the state before the flush. 
After
    the fix we needed to update the purge test since it's really 3 purge that 
are
    needed not 4 with 11 docs and a batch of 5 (5 + 5 + 1).
---
 src/couch/src/couch_auto_purge_plugin.erl          | 30 ++++++++++++----------
 .../test/eunit/couch_auto_purge_plugin_tests.erl   |  2 +-
 2 files changed, 18 insertions(+), 14 deletions(-)

diff --git a/src/couch/src/couch_auto_purge_plugin.erl 
b/src/couch/src/couch_auto_purge_plugin.erl
index 16faa1e89..34234377e 100644
--- a/src/couch/src/couch_auto_purge_plugin.erl
+++ b/src/couch/src/couch_auto_purge_plugin.erl
@@ -53,7 +53,8 @@ db(St, DbName) ->
     end.
 
 db_opened(#{} = St, Db) ->
-    #{ttl := TTL} = St,
+    #{ttl := TTL, queue := Queue} = St,
+    ?assert(Queue == [], "Queue is not empty from previous operations"),
     EndSeq = couch_time_seq:since(couch_db:get_time_seq(Db), 
couch_time_seq:timestamp() - TTL),
     ChangeOpts =
         if
@@ -64,9 +65,9 @@ db_opened(#{} = St, Db) ->
     {0, ChangeOpts, St#{count => 0, end_seq => EndSeq}}.
 
 db_closing(#{} = St, Db) ->
-    #{count := Count} = flush_queue(St, Db),
-    ?INFO("purged ~B deleted documents from ~s", [Count, couch_db:name(Db)], 
meta(St)),
-    {ok, St}.
+    St1 = #{count := Count} = flush_queue(St, Db),
+    ?INFO("purged ~B deleted documents from ~s", [Count, couch_db:name(Db)], 
meta(St1)),
+    {ok, St1}.
 
 doc_fdi(#{} = St, #full_doc_info{deleted = true} = FDI, Db) ->
     #{end_seq := EndSeq} = St,
@@ -89,19 +90,20 @@ enqueue(#{queue := Queue} = St0, Id, Revs, Db) ->
     if
         NewQueueSize > MaxBatchSize ->
             {RevBatch, RevRest} = lists:split(MaxBatchSize - CurrentQueueSize, 
Revs),
-            St1 = flush_queue(St0#{queue => [{Id, RevBatch} | Queue]}, Db),
+            St1 = flush_queue(St0#{queue := [{Id, RevBatch} | Queue]}, Db),
             enqueue(St1, Id, RevRest, Db);
         NewQueueSize >= MinBatchSize ->
-            flush_queue(St0#{queue => [{Id, Revs} | Queue]}, Db);
+            flush_queue(St0#{queue := [{Id, Revs} | Queue]}, Db);
         NewQueueSize < MinBatchSize ->
-            St0#{queue => [{Id, Revs} | Queue]}
+            St0#{queue := [{Id, Revs} | Queue]}
     end.
 
 flush_queue(#{queue := []} = St, _Db) ->
     St;
 flush_queue(#{queue := IdRevs} = St, Db) ->
     DbName = mem3:dbname(couch_db:name(Db)),
-    PurgeFun = fun() -> fabric:purge_docs(DbName, IdRevs, [?ADMIN_CTX]) end,
+    N = mem3:n(DbName),
+    PurgeFun = fun() -> fabric:purge_docs(DbName, IdRevs, [?ADMIN_CTX, {w, 
N}]) end,
     Timeout = fabric_util:request_timeout(),
     try fabric_util:isolate(PurgeFun, Timeout) of
         {Health, Results} when Health == ok; Health == accepted ->
@@ -116,9 +118,9 @@ flush_queue(#{queue := IdRevs} = St, Db) ->
             ),
             timer:sleep(Wait),
             St#{
-                count => Count + length(Results),
-                limiter => Limiter1,
-                queue => []
+                count := Count + length(Results),
+                limiter := Limiter1,
+                queue := []
             };
         Else ->
             ?WARN(
@@ -126,7 +128,8 @@ flush_queue(#{queue := IdRevs} = St, Db) ->
                 [DbName, Else],
                 meta(St)
             ),
-            St
+            % Reset the queue. We'll catch these on the next run.
+            St#{queue := []}
     catch
         Class:Reason ->
             ?WARN(
@@ -134,7 +137,8 @@ flush_queue(#{queue := IdRevs} = St, Db) ->
                 [DbName, Class, Reason],
                 meta(St)
             ),
-            St
+            % Reset the queue. We'll catch these on the next run.
+            St#{queue := []}
     end.
 
 queue_size(Queue) when is_list(Queue) ->
diff --git a/src/couch/test/eunit/couch_auto_purge_plugin_tests.erl 
b/src/couch/test/eunit/couch_auto_purge_plugin_tests.erl
index 63bdfd91e..a42ef52c5 100644
--- a/src/couch/test/eunit/couch_auto_purge_plugin_tests.erl
+++ b/src/couch/test/eunit/couch_auto_purge_plugin_tests.erl
@@ -117,7 +117,7 @@ t_min_batch_size_2({_, DbName}) ->
     meck:reset(?PLUGIN),
     config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false),
     wait_exit(10000),
-    ?assertEqual(4, meck:num_calls(fabric, purge_docs, '_')),
+    ?assertEqual(3, meck:num_calls(fabric, purge_docs, '_')),
     ?assertEqual(0, doc_del_count(DbName)),
     ok.
 

Reply via email to