This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch auto-purge-batching in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 646a0e9c98c41da19b9042b2f5219b6eaffc5723 Author: Robert Newson <[email protected]> AuthorDate: Thu Oct 23 17:37:02 2025 +0100 Introduce a minimum batch size for auto purge --- src/couch/src/couch_auto_purge_plugin.erl | 66 ++++++++++++++++++++++--------- 1 file changed, 48 insertions(+), 18 deletions(-) diff --git a/src/couch/src/couch_auto_purge_plugin.erl b/src/couch/src/couch_auto_purge_plugin.erl index 64ad945a5..99e69015e 100644 --- a/src/couch/src/couch_auto_purge_plugin.erl +++ b/src/couch/src/couch_auto_purge_plugin.erl @@ -64,7 +64,7 @@ db_opened(#{} = St, Db) -> {0, ChangeOpts, St#{count => 0, end_seq => EndSeq}}. db_closing(#{} = St, Db) -> - #{count := Count} = St, + #{count := Count} = flush_queue(St, Db), ?INFO("purged ~B deleted documents from ~s", [Count, couch_db:name(Db)], meta(St)), {ok, St}. @@ -73,47 +73,67 @@ doc_fdi(#{} = St, #full_doc_info{deleted = true} = FDI, Db) -> ?assert( FDI#full_doc_info.update_seq =< EndSeq, "FDI update_seq should not be greater than end seq" ), - {ok, purge(St, FDI, Db)}; + {ok, enqueue(St, FDI, Db)}; doc_fdi(#{} = St, #full_doc_info{}, _Db) -> {ok, St}. -purge(#{} = St, #full_doc_info{} = FDI, Db) -> +enqueue(#{} = St, FDI, Db) -> {Id, Revs} = fdi_to_idrevs(FDI), - MaxBatchSize = config:get_integer(atom_to_list(?MODULE), "max_batch_size", 500), - purge(St, Id, Revs, MaxBatchSize, Db). + enqueue(St, Id, Revs, Db). -purge(#{} = St, Id, Revs, MaxBatchSize, Db) when length(Revs) =< MaxBatchSize -> +enqueue(#{queue := Queue} = St0, Id, Revs, Db) -> + CurrentQueueSize = queue_size(Queue), + NewQueueSize = CurrentQueueSize + length(Revs), + MinBatchSize = min_batch_size(), + MaxBatchSize = max_batch_size(), + + if + NewQueueSize >= MaxBatchSize -> + {RevBatch, RevRest} = lists:split(NewQueueSize - MaxBatchSize, Revs), + St1 = flush_queue(St0#{queue => [{Id, RevBatch} | Queue]}, Db), + enqueue(St1, Id, RevRest, Db); + NewQueueSize >= MinBatchSize -> + flush_queue(St0#{queue => [{Id, Revs} | Queue]}, Db); + NewQueueSize < MinBatchSize -> + St0#{queue => [{Id, Revs} | Queue]} + end. + +flush_queue(#{queue := IdRevs} = St, Db) -> DbName = mem3:dbname(couch_db:name(Db)), - PurgeFun = fun() -> fabric:purge_docs(DbName, [{Id, Revs}], [?ADMIN_CTX]) end, + PurgeFun = fun() -> fabric:purge_docs(DbName, IdRevs, [?ADMIN_CTX]) end, Timeout = fabric_util:request_timeout(), try fabric_util:isolate(PurgeFun, Timeout) of {Health, Results} when Health == ok; Health == accepted -> + ?DEBUG("flushed batch of ~B idrevs from ~s", [queue_size(IdRevs), couch_db:name(Db)], meta(St)), #{count := Count, limiter := Limiter0} = St, {Wait, Limiter1} = couch_scanner_rate_limiter:update( Limiter0, doc_write, length(Results) ), timer:sleep(Wait), - St#{count => Count + length(Results), limiter => Limiter1}; + St#{ + count => Count + length(Results), + limiter => Limiter1, + queue => [] + }; Else -> ?WARN( - "Failed to purge deleted documents in ~s/~s for reason ~p", - [DbName, Id, Else], + "Failed to purge deleted documents in ~s for reason ~p", + [DbName, Else], meta(St) ), St catch Class:Reason -> ?WARN( - "Failed to purge deleted documents in ~s/~s for reason ~p:~p", - [DbName, Id, Class, Reason], + "Failed to purge deleted documents in ~s for reason ~p:~p", + [DbName, Class, Reason], meta(St) ), St - end; -purge(#{} = St0, Id, Revs, MaxBatchSize, Db) -> - {RevBatch, RevRest} = lists:split(MaxBatchSize, Revs), - St1 = purge(St0, Id, RevBatch, MaxBatchSize, Db), - purge(St1, Id, RevRest, MaxBatchSize, Db). + end. + +queue_size(Queue) when is_list(Queue) -> + lists:sum([length(Revs) || {_Id, Revs} <- Queue]). fdi_to_idrevs(#full_doc_info{} = FDI) -> Revs = [ @@ -123,7 +143,11 @@ fdi_to_idrevs(#full_doc_info{} = FDI) -> {FDI#full_doc_info.id, Revs}. init_config(ScanId) -> - #{sid => ScanId, limiter => couch_scanner_rate_limiter:get()}. + #{ + sid => ScanId, + limiter => couch_scanner_rate_limiter:get(), + queue => [] + }. meta(#{sid := ScanId}) -> #{sid => ScanId}. @@ -174,3 +198,9 @@ parse_ttl([$- | TTL]) -> -(parse_ttl(TTL)); parse_ttl(TTL) -> couch_scanner_util:parse_non_weekday_period(TTL). + +min_batch_size() -> + erlang:max(1, config:get_integer(atom_to_list(?MODULE), "min_batch_size", 100)). + +max_batch_size() -> + erlang:max(min_batch_size(), config:get_integer(atom_to_list(?MODULE), "max_batch_size", 500)).
