This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch use-dreyfus-checkpoint-for-purge-seq in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit a446cdc3f7732ac9395c27904b9ef839f8154209 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Thu Mar 12 17:24:15 2026 -0400 Use dreyfus checkpoint for purge_seq Previously, in the index updater we used the purge_seq value from clouseau. In some cases that can return an older value (0 on a new index) than what is in the purge checkpoint doc created in `maybe_create_local_purge_doc/2`. (In that function we initialize the checkpoint with the db purge sequence and call `clouseau_rpc:set_purge_seq/1` to also set the clouseau purge seq value). An older purge sequence than the current minimum db purge sequence would result in an `invalid_start_purge_seq` being thrown during purged infos folding. In general, if a client updates a purge checkpoint, then it should not query purged infos with a sequence value below that, since if that is the lowest current purge checkpoint value and compaction runs, it could have removed all the purged infos below that. --- src/dreyfus/src/dreyfus_index_updater.erl | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/dreyfus/src/dreyfus_index_updater.erl b/src/dreyfus/src/dreyfus_index_updater.erl index 278d42b54..476595c02 100644 --- a/src/dreyfus/src/dreyfus_index_updater.erl +++ b/src/dreyfus/src/dreyfus_index_updater.erl @@ -30,8 +30,10 @@ update(IndexPid, Index) -> erlang:put(io_priority, {search, DbName, IndexName}), {ok, Db} = couch_db:open_int(DbName, []), try + IdxPurgeSeq = get_local_doc_purge_seq(Db, Index), + DbPurgeSeq = couch_db:get_purge_seq(Db), + TotalPurgeChanges = DbPurgeSeq - IdxPurgeSeq, TotalUpdateChanges = couch_db:count_changes_since(Db, CurSeq), - TotalPurgeChanges = count_pending_purged_docs_since(Db, IndexPid), TotalChanges = TotalUpdateChanges + TotalPurgeChanges, couch_task_status:add_task([ @@ -49,7 +51,7 @@ update(IndexPid, Index) -> %ExcludeIdRevs is [{Id1, Rev1}, {Id2, Rev2}, ...] %The Rev is the final Rev, not purged Rev. - {ok, ExcludeIdRevs} = purge_index(Db, IndexPid, Index), + {ok, ExcludeIdRevs} = purge_index(Db, IndexPid, Index, IdxPurgeSeq), %% compute on all docs modified since we last computed. NewCurSeq = couch_db:get_update_seq(Db), @@ -87,8 +89,12 @@ load_docs(FDI, {I, IndexPid, Db, Proc, Total, LastCommitTime, ExcludeIdRevs} = A {ok, setelement(1, Acc, I + 1)} end. -purge_index(Db, IndexPid, Index) -> - {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid), +purge_index(Db, IndexPid, Index, IdxPurgeSeq) -> + % Note: we're not using IdxPurgeSeq = clouseau_rpc:get_purge_seq/1 as that + % might return the stale (committed) value and not the newly updated purge + % seq we just set in maybe_create_local_purge_doc/3 on new doc creation. + % Using an old value could result in the invalid_start_purge_seq exception + % being raised when folding over purged infos. Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def]), @@ -120,11 +126,6 @@ purge_index(Db, IndexPid, Index) -> ret_os_process(Proc) end. -count_pending_purged_docs_since(Db, IndexPid) -> - DbPurgeSeq = couch_db:get_purge_seq(Db), - {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid), - DbPurgeSeq - IdxPurgeSeq. - update_or_delete_index(IndexPid, Db, DI, Proc) -> #doc_info{id = Id, revs = [#rev_info{deleted = Del} | _]} = DI, case Del of @@ -152,6 +153,13 @@ update_local_doc(Db, Index, PurgeSeq) -> DocContent = dreyfus_util:get_local_purge_doc_body(Db, DocId, PurgeSeq, Index), couch_db:update_doc(Db, DocContent, []). +get_local_doc_purge_seq(Db, Index) -> + DocId = dreyfus_util:get_local_purge_doc_id(Index#index.sig), + % We're implicitly asserting this purge checkpoint doc should exist. This is + % created either on open or during compaction in on_compact handler + {ok, #doc{body = {[_ | _] = Props}}} = couch_db:open_doc(Db, DocId), + couch_util:get_value(<<"purge_seq">>, Props). + update_task(NumChanges) -> [Changes, Total] = couch_task_status:get([changes_done, total_changes]), Changes2 = Changes + NumChanges,
