This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch use-dreyfus-checkpoint-for-purge-seq
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit a446cdc3f7732ac9395c27904b9ef839f8154209
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Thu Mar 12 17:24:15 2026 -0400

    Use dreyfus checkpoint for purge_seq
    
    Previously, in the index updater we used the purge_seq value from clouseau. 
In
    some cases that can return an older value (0 on a new index) than what is in
    the purge checkpoint doc created in `maybe_create_local_purge_doc/2`. (In 
that
    function we initialize the checkpoint with the db purge sequence and call
    `clouseau_rpc:set_purge_seq/1` to also set the clouseau purge seq value). An
    older purge sequence than the current minimum db purge sequence would 
result in
    an `invalid_start_purge_seq` being thrown during purged infos folding.
    
    In general, if a client updates a purge checkpoint, then it should not query
    purged infos with a sequence value below that, since if that is the lowest
    current purge checkpoint value and compaction runs, it could have removed 
all
    the purged infos below that.
---
 src/dreyfus/src/dreyfus_index_updater.erl | 26 +++++++++++++++++---------
 1 file changed, 17 insertions(+), 9 deletions(-)

diff --git a/src/dreyfus/src/dreyfus_index_updater.erl 
b/src/dreyfus/src/dreyfus_index_updater.erl
index 278d42b54..476595c02 100644
--- a/src/dreyfus/src/dreyfus_index_updater.erl
+++ b/src/dreyfus/src/dreyfus_index_updater.erl
@@ -30,8 +30,10 @@ update(IndexPid, Index) ->
     erlang:put(io_priority, {search, DbName, IndexName}),
     {ok, Db} = couch_db:open_int(DbName, []),
     try
+        IdxPurgeSeq = get_local_doc_purge_seq(Db, Index),
+        DbPurgeSeq = couch_db:get_purge_seq(Db),
+        TotalPurgeChanges = DbPurgeSeq - IdxPurgeSeq,
         TotalUpdateChanges = couch_db:count_changes_since(Db, CurSeq),
-        TotalPurgeChanges = count_pending_purged_docs_since(Db, IndexPid),
         TotalChanges = TotalUpdateChanges + TotalPurgeChanges,
 
         couch_task_status:add_task([
@@ -49,7 +51,7 @@ update(IndexPid, Index) ->
 
         %ExcludeIdRevs is [{Id1, Rev1}, {Id2, Rev2}, ...]
         %The Rev is the final Rev, not purged Rev.
-        {ok, ExcludeIdRevs} = purge_index(Db, IndexPid, Index),
+        {ok, ExcludeIdRevs} = purge_index(Db, IndexPid, Index, IdxPurgeSeq),
         %% compute on all docs modified since we last computed.
 
         NewCurSeq = couch_db:get_update_seq(Db),
@@ -87,8 +89,12 @@ load_docs(FDI, {I, IndexPid, Db, Proc, Total, 
LastCommitTime, ExcludeIdRevs} = A
             {ok, setelement(1, Acc, I + 1)}
     end.
 
-purge_index(Db, IndexPid, Index) ->
-    {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid),
+purge_index(Db, IndexPid, Index, IdxPurgeSeq) ->
+    % Note: we're not using IdxPurgeSeq = clouseau_rpc:get_purge_seq/1 as that
+    % might return the stale (committed) value and not the newly updated purge
+    % seq we just set in maybe_create_local_purge_doc/3 on new doc creation.
+    % Using an old value could result in the invalid_start_purge_seq exception
+    % being raised when folding over purged infos.
     Proc = get_os_process(Index#index.def_lang),
     try
         true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def]),
@@ -120,11 +126,6 @@ purge_index(Db, IndexPid, Index) ->
         ret_os_process(Proc)
     end.
 
-count_pending_purged_docs_since(Db, IndexPid) ->
-    DbPurgeSeq = couch_db:get_purge_seq(Db),
-    {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid),
-    DbPurgeSeq - IdxPurgeSeq.
-
 update_or_delete_index(IndexPid, Db, DI, Proc) ->
     #doc_info{id = Id, revs = [#rev_info{deleted = Del} | _]} = DI,
     case Del of
@@ -152,6 +153,13 @@ update_local_doc(Db, Index, PurgeSeq) ->
     DocContent = dreyfus_util:get_local_purge_doc_body(Db, DocId, PurgeSeq, 
Index),
     couch_db:update_doc(Db, DocContent, []).
 
+get_local_doc_purge_seq(Db, Index) ->
+    DocId = dreyfus_util:get_local_purge_doc_id(Index#index.sig),
+    % We're implicitly asserting this purge checkpoint doc should exist. This 
is
+    % created either on open or during compaction in on_compact handler
+    {ok, #doc{body = {[_ | _] = Props}}} = couch_db:open_doc(Db, DocId),
+    couch_util:get_value(<<"purge_seq">>, Props).
+
 update_task(NumChanges) ->
     [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
     Changes2 = Changes + NumChanges,

Reply via email to