This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch auto-delete-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/auto-delete-3 by this push:
     new c7af6527c teach replicator to make peer checkpoint
c7af6527c is described below

commit c7af6527c8fd25277cca782e9f36d492177a07cd
Author: Robert Newson <rnew...@apache.org>
AuthorDate: Thu Mar 20 17:08:55 2025 +0000

    teach replicator to make peer checkpoint
---
 .../src/couch_replicator_scheduler_job.erl         | 36 ++++++++++++++++------
 1 file changed, 26 insertions(+), 10 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl 
b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 62e604b5e..f92019cda 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -678,6 +678,7 @@ init_state(Rep) ->
     StartSeq = {0, StartSeq1},
 
     SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
+    create_peer_checkpoint_doc_if_missing(Source, BaseId, SourceSeq),
 
     #doc{body = {CheckpointHistory}} = SourceLog,
     State = #rep_state{
@@ -809,7 +810,7 @@ do_checkpoint(State) ->
         src_starttime = SrcInstanceStartTime,
         tgt_starttime = TgtInstanceStartTime,
         stats = Stats,
-        rep_details = #rep{options = Options},
+        rep_details = #rep{id = {BaseId, _}, options = Options},
         session_id = SessionId
     } = State,
     case commit_to_both(Source, Target) of
@@ -886,7 +887,7 @@ do_checkpoint(State) ->
                 {TgtRevPos, TgtRevId} = update_checkpoint(
                     Target, TargetLog#doc{body = NewRepHistory}, target
                 ),
-                %% TODO update_checkpoint(Source, peer_checkpoint_doc(State), 
source),
+                update_checkpoint(Source, peer_checkpoint_doc(BaseId, NewSeq), 
source),
                 NewState = State#rep_state{
                     checkpoint_history = NewRepHistory,
                     committed_seq = NewTsSeq,
@@ -913,14 +914,29 @@ do_checkpoint(State) ->
             >>}
     end.
 
-peer_checkpoint_doc(#rep_state{} = State) ->
-    #rep_state{
-        session_id = SessionId
-    } = State,
-    #doc{
-        id = <<"peer-checkpoint-", SessionId/binary>>,
-        body = {[{<<"update_seq">>, State#rep_state.committed_seq}]}
-    }.
+create_peer_checkpoint_doc_if_missing(#httpdb{} = Db, BaseId, SourceSeq) when
+    is_list(BaseId), is_binary(SourceSeq)
+->
+    case couch_replicator_api_wrap:open_doc(Db, peer_checkpoint_id(BaseId), 
[]) of
+        {ok, _} ->
+            ok;
+        {error, <<"not_found">>} ->
+            Doc = peer_checkpoint_doc(BaseId, SourceSeq),
+            case couch_replicator_api_wrap:update_doc(Db, Doc, []) of
+                {ok, _} ->
+                    ok;
+                {error, Reason} ->
+                    throw({checkpoint_commit_failure, Reason})
+            end;
+        {error, Reason} ->
+            throw({checkpoint_commit_failure, Reason})
+    end.
+
+peer_checkpoint_doc(BaseId, Seq) ->
+    #doc{id = peer_checkpoint_id(BaseId), body = {[{<<"update_seq">>, Seq}]}}.
+
+peer_checkpoint_id(BaseId) when is_list(BaseId) ->
+    ?l2b(?LOCAL_DOC_PREFIX ++ "peer-checkpoint-repl-" ++ BaseId).
 
 update_checkpoint(Db, Doc, DbType) ->
     try

Reply via email to