This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch w-3-for-auto-purge-plugin
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 8931d62a84ada377d1f8123a3d939830ef9a6cb2
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Mon Oct 27 16:31:19 2025 -0400

    Use W=N for auto-purge plugin
    
    The idea is we would like to do as much work in the auto-purge request 
itself
    and leave as little as possible for the internal replicator.
    
    While at it, strengthen some state map matches by asserting that we expect 
the
    fields to always by there. Also, ensure to always reset queue to empty as 
when
    we're opening a new db shard.
---
 src/couch/src/couch_auto_purge_plugin.erl | 21 ++++++++++-----------
 1 file changed, 10 insertions(+), 11 deletions(-)

diff --git a/src/couch/src/couch_auto_purge_plugin.erl 
b/src/couch/src/couch_auto_purge_plugin.erl
index 16faa1e89..2b9969d02 100644
--- a/src/couch/src/couch_auto_purge_plugin.erl
+++ b/src/couch/src/couch_auto_purge_plugin.erl
@@ -47,7 +47,7 @@ checkpoint(_St) ->
 db(St, DbName) ->
     case ttl(St, DbName) of
         TTL when is_integer(TTL) ->
-            {ok, St#{ttl => TTL}};
+            {ok, St#{ttl => TTL, db_name => DbName, db_n => mem3:n(DbName)}};
         undefined ->
             {skip, St}
     end.
@@ -61,7 +61,7 @@ db_opened(#{} = St, Db) ->
             true -> [{end_key, EndSeq}]
         end,
     ?INFO("scanning for deleted documents in ~s up to ~p", [couch_db:name(Db), 
EndSeq], meta(St)),
-    {0, ChangeOpts, St#{count => 0, end_seq => EndSeq}}.
+    {0, ChangeOpts, St#{count => 0, end_seq => EndSeq, queue := []}}.
 
 db_closing(#{} = St, Db) ->
     #{count := Count} = flush_queue(St, Db),
@@ -89,19 +89,18 @@ enqueue(#{queue := Queue} = St0, Id, Revs, Db) ->
     if
         NewQueueSize > MaxBatchSize ->
             {RevBatch, RevRest} = lists:split(MaxBatchSize - CurrentQueueSize, 
Revs),
-            St1 = flush_queue(St0#{queue => [{Id, RevBatch} | Queue]}, Db),
+            St1 = flush_queue(St0#{queue := [{Id, RevBatch} | Queue]}, Db),
             enqueue(St1, Id, RevRest, Db);
         NewQueueSize >= MinBatchSize ->
-            flush_queue(St0#{queue => [{Id, Revs} | Queue]}, Db);
+            flush_queue(St0#{queue := [{Id, Revs} | Queue]}, Db);
         NewQueueSize < MinBatchSize ->
-            St0#{queue => [{Id, Revs} | Queue]}
+            St0#{queue := [{Id, Revs} | Queue]}
     end.
 
 flush_queue(#{queue := []} = St, _Db) ->
     St;
-flush_queue(#{queue := IdRevs} = St, Db) ->
-    DbName = mem3:dbname(couch_db:name(Db)),
-    PurgeFun = fun() -> fabric:purge_docs(DbName, IdRevs, [?ADMIN_CTX]) end,
+flush_queue(#{queue := IdRevs, db_name := DbName, db_n := N} = St, Db) ->
+    PurgeFun = fun() -> fabric:purge_docs(DbName, IdRevs, [?ADMIN_CTX, {w, 
N}]) end,
     Timeout = fabric_util:request_timeout(),
     try fabric_util:isolate(PurgeFun, Timeout) of
         {Health, Results} when Health == ok; Health == accepted ->
@@ -116,9 +115,9 @@ flush_queue(#{queue := IdRevs} = St, Db) ->
             ),
             timer:sleep(Wait),
             St#{
-                count => Count + length(Results),
-                limiter => Limiter1,
-                queue => []
+                count := Count + length(Results),
+                limiter := Limiter1,
+                queue := []
             };
         Else ->
             ?WARN(

Reply via email to