This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch auto-purge-batching
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 646a0e9c98c41da19b9042b2f5219b6eaffc5723
Author: Robert Newson <[email protected]>
AuthorDate: Thu Oct 23 17:37:02 2025 +0100

    Introduce a minimum batch size for auto purge
---
 src/couch/src/couch_auto_purge_plugin.erl | 66 ++++++++++++++++++++++---------
 1 file changed, 48 insertions(+), 18 deletions(-)

diff --git a/src/couch/src/couch_auto_purge_plugin.erl 
b/src/couch/src/couch_auto_purge_plugin.erl
index 64ad945a5..99e69015e 100644
--- a/src/couch/src/couch_auto_purge_plugin.erl
+++ b/src/couch/src/couch_auto_purge_plugin.erl
@@ -64,7 +64,7 @@ db_opened(#{} = St, Db) ->
     {0, ChangeOpts, St#{count => 0, end_seq => EndSeq}}.
 
 db_closing(#{} = St, Db) ->
-    #{count := Count} = St,
+    #{count := Count} = flush_queue(St, Db),
     ?INFO("purged ~B deleted documents from ~s", [Count, couch_db:name(Db)], 
meta(St)),
     {ok, St}.
 
@@ -73,47 +73,67 @@ doc_fdi(#{} = St, #full_doc_info{deleted = true} = FDI, Db) 
->
     ?assert(
         FDI#full_doc_info.update_seq =< EndSeq, "FDI update_seq should not be 
greater than end seq"
     ),
-    {ok, purge(St, FDI, Db)};
+    {ok, enqueue(St, FDI, Db)};
 doc_fdi(#{} = St, #full_doc_info{}, _Db) ->
     {ok, St}.
 
-purge(#{} = St, #full_doc_info{} = FDI, Db) ->
+enqueue(#{} = St, FDI, Db) ->
     {Id, Revs} = fdi_to_idrevs(FDI),
-    MaxBatchSize = config:get_integer(atom_to_list(?MODULE), "max_batch_size", 
500),
-    purge(St, Id, Revs, MaxBatchSize, Db).
+    enqueue(St, Id, Revs, Db).
 
-purge(#{} = St, Id, Revs, MaxBatchSize, Db) when length(Revs) =< MaxBatchSize 
->
+enqueue(#{queue := Queue} = St0, Id, Revs, Db) ->
+    CurrentQueueSize = queue_size(Queue),
+    NewQueueSize = CurrentQueueSize + length(Revs),
+    MinBatchSize = min_batch_size(),
+    MaxBatchSize = max_batch_size(),
+
+    if
+        NewQueueSize >= MaxBatchSize ->
+            {RevBatch, RevRest} = lists:split(NewQueueSize - MaxBatchSize, 
Revs),
+            St1 = flush_queue(St0#{queue => [{Id, RevBatch} | Queue]}, Db),
+            enqueue(St1, Id, RevRest, Db);
+        NewQueueSize >= MinBatchSize ->
+            flush_queue(St0#{queue => [{Id, Revs} | Queue]}, Db);
+        NewQueueSize < MinBatchSize ->
+            St0#{queue => [{Id, Revs} | Queue]}
+    end.
+
+flush_queue(#{queue := IdRevs} = St, Db) ->
     DbName = mem3:dbname(couch_db:name(Db)),
-    PurgeFun = fun() -> fabric:purge_docs(DbName, [{Id, Revs}], [?ADMIN_CTX]) 
end,
+    PurgeFun = fun() -> fabric:purge_docs(DbName, IdRevs, [?ADMIN_CTX]) end,
     Timeout = fabric_util:request_timeout(),
     try fabric_util:isolate(PurgeFun, Timeout) of
         {Health, Results} when Health == ok; Health == accepted ->
+            ?DEBUG("flushed batch of ~B idrevs from ~s", [queue_size(IdRevs), 
couch_db:name(Db)], meta(St)),
             #{count := Count, limiter := Limiter0} = St,
             {Wait, Limiter1} = couch_scanner_rate_limiter:update(
                 Limiter0, doc_write, length(Results)
             ),
             timer:sleep(Wait),
-            St#{count => Count + length(Results), limiter => Limiter1};
+            St#{
+                count => Count + length(Results),
+                limiter => Limiter1,
+                queue => []
+               };
         Else ->
             ?WARN(
-                "Failed to purge deleted documents in ~s/~s for reason ~p",
-                [DbName, Id, Else],
+                "Failed to purge deleted documents in ~s for reason ~p",
+                [DbName, Else],
                 meta(St)
             ),
             St
     catch
         Class:Reason ->
             ?WARN(
-                "Failed to purge deleted documents in ~s/~s for reason ~p:~p",
-                [DbName, Id, Class, Reason],
+                "Failed to purge deleted documents in ~s for reason ~p:~p",
+                [DbName, Class, Reason],
                 meta(St)
             ),
             St
-    end;
-purge(#{} = St0, Id, Revs, MaxBatchSize, Db) ->
-    {RevBatch, RevRest} = lists:split(MaxBatchSize, Revs),
-    St1 = purge(St0, Id, RevBatch, MaxBatchSize, Db),
-    purge(St1, Id, RevRest, MaxBatchSize, Db).
+    end.
+
+queue_size(Queue) when is_list(Queue) ->
+    lists:sum([length(Revs) || {_Id, Revs} <- Queue]).
 
 fdi_to_idrevs(#full_doc_info{} = FDI) ->
     Revs = [
@@ -123,7 +143,11 @@ fdi_to_idrevs(#full_doc_info{} = FDI) ->
     {FDI#full_doc_info.id, Revs}.
 
 init_config(ScanId) ->
-    #{sid => ScanId, limiter => couch_scanner_rate_limiter:get()}.
+    #{
+      sid => ScanId,
+      limiter => couch_scanner_rate_limiter:get(),
+      queue => []
+     }.
 
 meta(#{sid := ScanId}) ->
     #{sid => ScanId}.
@@ -174,3 +198,9 @@ parse_ttl([$- | TTL]) ->
     -(parse_ttl(TTL));
 parse_ttl(TTL) ->
     couch_scanner_util:parse_non_weekday_period(TTL).
+
+min_batch_size() ->
+    erlang:max(1, config:get_integer(atom_to_list(?MODULE), "min_batch_size", 
100)).
+
+max_batch_size() ->
+    erlang:max(min_batch_size(), config:get_integer(atom_to_list(?MODULE), 
"max_batch_size", 500)).

Reply via email to