This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/main by this push:
new 747e9deb8 Use W=N for auto-purge plugin
747e9deb8 is described below
commit 747e9deb80cdfe992278353017f298219712a3d9
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Mon Oct 27 16:31:19 2025 -0400
Use W=N for auto-purge plugin
The idea is we would like to do as much work in the auto-purge request
itself
and leave as little as possible for the internal replicator.
While at it, strengthen some state map matches by asserting that we expect
the
fields to always by there. Also, ensure to always reset queue to empty on
flush, even on error and assert it as such as when we're opening closing db
shard.
The assert also revealed a bug in db_closing where the updated stated from
the final
flush wasn't retain and instead we returned the state before the flush.
After
the fix we needed to update the purge test since it's really 3 purge that
are
needed not 4 with 11 docs and a batch of 5 (5 + 5 + 1).
---
src/couch/src/couch_auto_purge_plugin.erl | 30 ++++++++++++----------
.../test/eunit/couch_auto_purge_plugin_tests.erl | 2 +-
2 files changed, 18 insertions(+), 14 deletions(-)
diff --git a/src/couch/src/couch_auto_purge_plugin.erl
b/src/couch/src/couch_auto_purge_plugin.erl
index 16faa1e89..34234377e 100644
--- a/src/couch/src/couch_auto_purge_plugin.erl
+++ b/src/couch/src/couch_auto_purge_plugin.erl
@@ -53,7 +53,8 @@ db(St, DbName) ->
end.
db_opened(#{} = St, Db) ->
- #{ttl := TTL} = St,
+ #{ttl := TTL, queue := Queue} = St,
+ ?assert(Queue == [], "Queue is not empty from previous operations"),
EndSeq = couch_time_seq:since(couch_db:get_time_seq(Db),
couch_time_seq:timestamp() - TTL),
ChangeOpts =
if
@@ -64,9 +65,9 @@ db_opened(#{} = St, Db) ->
{0, ChangeOpts, St#{count => 0, end_seq => EndSeq}}.
db_closing(#{} = St, Db) ->
- #{count := Count} = flush_queue(St, Db),
- ?INFO("purged ~B deleted documents from ~s", [Count, couch_db:name(Db)],
meta(St)),
- {ok, St}.
+ St1 = #{count := Count} = flush_queue(St, Db),
+ ?INFO("purged ~B deleted documents from ~s", [Count, couch_db:name(Db)],
meta(St1)),
+ {ok, St1}.
doc_fdi(#{} = St, #full_doc_info{deleted = true} = FDI, Db) ->
#{end_seq := EndSeq} = St,
@@ -89,19 +90,20 @@ enqueue(#{queue := Queue} = St0, Id, Revs, Db) ->
if
NewQueueSize > MaxBatchSize ->
{RevBatch, RevRest} = lists:split(MaxBatchSize - CurrentQueueSize,
Revs),
- St1 = flush_queue(St0#{queue => [{Id, RevBatch} | Queue]}, Db),
+ St1 = flush_queue(St0#{queue := [{Id, RevBatch} | Queue]}, Db),
enqueue(St1, Id, RevRest, Db);
NewQueueSize >= MinBatchSize ->
- flush_queue(St0#{queue => [{Id, Revs} | Queue]}, Db);
+ flush_queue(St0#{queue := [{Id, Revs} | Queue]}, Db);
NewQueueSize < MinBatchSize ->
- St0#{queue => [{Id, Revs} | Queue]}
+ St0#{queue := [{Id, Revs} | Queue]}
end.
flush_queue(#{queue := []} = St, _Db) ->
St;
flush_queue(#{queue := IdRevs} = St, Db) ->
DbName = mem3:dbname(couch_db:name(Db)),
- PurgeFun = fun() -> fabric:purge_docs(DbName, IdRevs, [?ADMIN_CTX]) end,
+ N = mem3:n(DbName),
+ PurgeFun = fun() -> fabric:purge_docs(DbName, IdRevs, [?ADMIN_CTX, {w,
N}]) end,
Timeout = fabric_util:request_timeout(),
try fabric_util:isolate(PurgeFun, Timeout) of
{Health, Results} when Health == ok; Health == accepted ->
@@ -116,9 +118,9 @@ flush_queue(#{queue := IdRevs} = St, Db) ->
),
timer:sleep(Wait),
St#{
- count => Count + length(Results),
- limiter => Limiter1,
- queue => []
+ count := Count + length(Results),
+ limiter := Limiter1,
+ queue := []
};
Else ->
?WARN(
@@ -126,7 +128,8 @@ flush_queue(#{queue := IdRevs} = St, Db) ->
[DbName, Else],
meta(St)
),
- St
+ % Reset the queue. We'll catch these on the next run.
+ St#{queue := []}
catch
Class:Reason ->
?WARN(
@@ -134,7 +137,8 @@ flush_queue(#{queue := IdRevs} = St, Db) ->
[DbName, Class, Reason],
meta(St)
),
- St
+ % Reset the queue. We'll catch these on the next run.
+ St#{queue := []}
end.
queue_size(Queue) when is_list(Queue) ->
diff --git a/src/couch/test/eunit/couch_auto_purge_plugin_tests.erl
b/src/couch/test/eunit/couch_auto_purge_plugin_tests.erl
index 63bdfd91e..a42ef52c5 100644
--- a/src/couch/test/eunit/couch_auto_purge_plugin_tests.erl
+++ b/src/couch/test/eunit/couch_auto_purge_plugin_tests.erl
@@ -117,7 +117,7 @@ t_min_batch_size_2({_, DbName}) ->
meck:reset(?PLUGIN),
config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false),
wait_exit(10000),
- ?assertEqual(4, meck:num_calls(fabric, purge_docs, '_')),
+ ?assertEqual(3, meck:num_calls(fabric, purge_docs, '_')),
?assertEqual(0, doc_del_count(DbName)),
ok.