This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch fix-purge-internal-replicator-client-verify
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 578c09e229e1f46d0f14a5a90b8eb615ac9f3fa2
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Tue Nov 25 00:41:23 2025 -0500

    Optimize and clean up internal replicator purge checkpoints
    
    Previously, the internal replicator created twice the number of checkpoints
    needed to replicate purges between nodes. An internal replication job first
    pulls the purges from the target to the source, then pushes the purges from 
the
    source to the target, then finally pushes the document updates to the 
target.
    
    During the pull operation, for example, from node `B` (the target) to node
    `A` (the source), it creates an `A->B` checkpoint on node `B` (the target).
    Then, during the push from `A` to `B` it creates an `A->B` checkpoint on 
node
    A (the source). As a result, after the job finishes there are two 
checkpoints:
    an A->B one on A, and an `A->B` one on B. It may look something like this:
    
    ```
      [node A]                  [node B]
    
             <-------pull------ (A->B)
      (A->B) --------push------>
    ```
    
    When the internal replication job runs on node B and _pushes_ purges to 
node A,
    it will create a `B->A` checkpoint on B. After this instant, there will be 
two
    checkpoints on B for replicating purges from B to A: one is `A->B`, from the
    first job, and another `B->A`, from the second job. Both of the checkpoints
    essentially checkpoint the same thing. It may looke like this after both
    replication jobs finish:
    
    ```
      [node A]                  [node B]
    
             <-------pull------ (A->B)           JOB1
      (A->B) --------push------>
    
      (B->A) --------pull------>
             <-------push------ (B->A)           JOB2
    ```
    
    On B, the checkpoints `A->B` and `B->A` could have a different purge 
sequence:
    one higher than the other, and so the lower one could delay the compactor 
from
    cleaning up purge infos. This also makes it harder to reason about the
    replication process, since we have an `A->B` checkpoint on `B`, but it's for
    sending changes _from_ B _to_ A, not like one might expect `A->B` based on 
its
    name.
    
    To fix this, make sure to use a single checkpoint per direction of 
replication.
    So, when change are pulled from B to A, the checkpoint is now B->A, and when
    changes are pushed from B to A the checkpoint is also B->A.
    
    It should look something like this:
    
    ```
      [node A]                  [node B]
    
             <-------pull------                JOB1
      (A->B) --------push------>
    
              --------pull------>
             <-------push------ (B->A)           JOB2
    ```
    
    Since after this change we'll have some deprecated purge checkpoints to 
clean
    up, it's probably also a good time to introduce purge checkpoint cleanup. We
    have this for indexes but we didn't have it for the internal replicator. 
That
    meant that on shard map reconfigurations, or node membership changes, user
    would have to manually hunt down local (un-clustered) stale purge 
checkpoints
    and remove them. Now this happens automatically when we compact, and before 
we
    replicate between nodes.
---
 src/couch/src/couch_bt_engine_compactor.erl |   4 +
 src/mem3/src/mem3_rep.erl                   | 145 +++++++++++++++++++---------
 src/mem3/src/mem3_rpc.erl                   |  42 ++++++--
 src/mem3/test/eunit/mem3_rep_test.erl       | 119 ++++++++++++++++++++++-
 4 files changed, 256 insertions(+), 54 deletions(-)

diff --git a/src/couch/src/couch_bt_engine_compactor.erl 
b/src/couch/src/couch_bt_engine_compactor.erl
index d412c891f..85d33cf95 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -158,6 +158,10 @@ copy_purge_info(#comp_st{} = CompSt) ->
         retry = Retry
     } = CompSt,
     ?COMP_EVENT(purge_init),
+    % The minumum purge sequence calculation involves finding the lowest
+    % reported purge sequence across all checkpoints. Make sure to clean up any
+    % stale or deprecated internal replicator checkpoints beforehand.
+    ok = mem3_rep:cleanup_purge_checkpoints(DbName),
     MinPurgeSeq = couch_util:with_db(DbName, fun(Db) ->
         couch_db:get_minimum_purge_seq(Db)
     end),
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index c5157f52c..e602d0bd1 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -19,6 +19,7 @@
     make_local_id/3,
     make_purge_id/2,
     verify_purge_checkpoint/2,
+    cleanup_purge_checkpoints/1,
     find_source_seq/4,
     find_split_target_seq/4,
     local_id_hash/1
@@ -56,6 +57,10 @@
 }).
 
 -define(DEFAULT_REXI_TIMEOUT, 600000).
+-define(CHECKPOINT_PREFIX, "_local/shard-sync-").
+-define(PURGE_PREFIX, "_local/purge-mem3-").
+-define(UUID_SIZE, 32).
+-define(PURGE_TYPE, <<"internal_replication">>).
 
 go(Source, Target) ->
     go(Source, Target, []).
@@ -148,12 +153,12 @@ make_local_id(#shard{node = SourceNode}, #shard{node = 
TargetNode}, Filter) ->
 make_local_id(SourceThing, TargetThing, F) when is_binary(F) ->
     S = local_id_hash(SourceThing),
     T = local_id_hash(TargetThing),
-    <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>;
+    <<?CHECKPOINT_PREFIX, S/binary, "-", T/binary, F/binary>>;
 make_local_id(SourceThing, TargetThing, Filter) ->
     S = local_id_hash(SourceThing),
     T = local_id_hash(TargetThing),
     F = filter_hash(Filter),
-    <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
+    <<?CHECKPOINT_PREFIX, S/binary, "-", T/binary, F/binary>>.
 
 filter_hash(Filter) when is_function(Filter) ->
     {new_uniq, Hash} = erlang:fun_info(Filter, new_uniq),
@@ -166,44 +171,98 @@ local_id_hash(Thing) ->
     couch_util:encodeBase64Url(couch_hash:md5_hash(?term_to_bin(Thing))).
 
 make_purge_id(SourceUUID, TargetUUID) ->
-    <<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>.
+    <<?PURGE_PREFIX, SourceUUID/binary, "-", TargetUUID/binary>>.
 
-verify_purge_checkpoint(DbName, Props) ->
-    try
-        Type = couch_util:get_value(<<"type">>, Props),
-        if
-            Type =/= <<"internal_replication">> ->
-                false;
-            true ->
-                SourceBin = couch_util:get_value(<<"source">>, Props),
-                TargetBin = couch_util:get_value(<<"target">>, Props),
-                Range = couch_util:get_value(<<"range">>, Props),
+remote_id_to_local(<<?PURGE_PREFIX, Remote:?UUID_SIZE/binary, "-", 
Local:?UUID_SIZE/binary>>) ->
+    <<?PURGE_PREFIX, Local/binary, "-", Remote/binary>>.
 
-                Source = binary_to_existing_atom(SourceBin, latin1),
-                Target = binary_to_existing_atom(TargetBin, latin1),
+% If the shard map changed, nodes are decomissioned, or user upgraded from a
+% version before 3.6 we may have some some checkpoints to clean up. Call this
+% function before compacting, right before we calculate the minimum purge
+% sequence, and also before we replicate purges to/from other copies.
+%
+cleanup_purge_checkpoints(ShardName) when is_binary(ShardName) ->
+    couch_util:with_db(ShardName, fun(Db) -> cleanup_purge_checkpoints(Db) 
end);
+cleanup_purge_checkpoints(Db) ->
+    Shards = shards(couch_db:name(Db)),
+    UUID = couch_db:get_uuid(Db),
+    FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) ->
+        case Id of
+            <<?PURGE_PREFIX, UUID:?UUID_SIZE/binary, "-", 
_:?UUID_SIZE/binary>> ->
+                case verify_checkpoint_shard(Shards, Props) of
+                    true -> {ok, Acc};
+                    false -> {ok, [Id | Acc]}
+                end;
+            <<?PURGE_PREFIX, _:?UUID_SIZE/binary, "-", _:?UUID_SIZE/binary>> ->
+                % Cleanup checkpoints not originating at the current shard.
+                % Previously, before version 3.6, during a pull from shard B to
+                % shard A we checkpointed on target B with doc ID
+                % mem3-purge-$AUuid-$BUuid. That created a redunant checkpoint
+                % which was the same as target B pushing changes to target A,
+                % which already had a checkpoint: mem3-purge-$BUuid-$AUuid,
+                % with the same direction and same purge sequence ID. So here
+                % we remove those reduntant checkpoints.
+                {ok, [Id | Acc]};
+            _ ->
+                {stop, Acc}
+        end
+    end,
+    Opts = [{start_key, list_to_binary(?PURGE_PREFIX)}],
+    {ok, ToDelete} = couch_db:fold_local_docs(Db, FoldFun, [], Opts),
+    DeleteFun = fun(DocId) -> delete_checkpoint(Db, DocId) end,
+    lists:foreach(DeleteFun, ToDelete).
+
+delete_checkpoint(Db, DocId) ->
+    DbName = couch_db:name(Db),
+    LogMsg = "~p : deleting inactive purge checkpoint ~s : ~s",
+    couch_log:warning(LogMsg, [?MODULE, DbName, DocId]),
+    try couch_db:open_doc(Db, DocId, []) of
+        {ok, Doc = #doc{}} ->
+            Deleted = Doc#doc{deleted = true, body = {[]}},
+            couch_db:update_doc(Db, Deleted, [?ADMIN_CTX]);
+        {not_found, _} ->
+            ok
+    catch
+        Tag:Error ->
+            ErrLog = "~p : error deleting checkpoint ~s : ~s error: ~p:~p",
+            couch_log:error(ErrLog, [?MODULE, DbName, DocId, Tag, Error]),
+            ok
+    end.
 
-                try
-                    Nodes = lists:foldl(
-                        fun(Shard, Acc) ->
-                            case Shard#shard.range == Range of
-                                true -> [Shard#shard.node | Acc];
-                                false -> Acc
-                            end
-                        end,
-                        [],
-                        mem3:shards(mem3:dbname(DbName))
-                    ),
-                    lists:member(Source, Nodes) andalso lists:member(Target, 
Nodes)
-                catch
-                    error:database_does_not_exist ->
-                        false
-                end
+verify_purge_checkpoint(DbName, Props) ->
+    try
+        case couch_util:get_value(<<"type">>, Props) of
+            ?PURGE_TYPE -> verify_checkpoint_shard(shards(DbName), Props);
+            _ -> false
         end
     catch
-        _:_ ->
+        Tag:Error ->
+            ErrLog = "~p : invalid checkpoint shard:~s props:~p error: ~p:~p",
+            couch_log:error(ErrLog, [?MODULE, DbName, Props, Tag, Error]),
             false
     end.
 
+shards(DbName) ->
+    try
+        mem3:shards(mem3:dbname(DbName))
+    catch
+        error:database_does_not_exist ->
+            []
+    end.
+
+verify_checkpoint_shard(Shards, Props) when is_list(Shards), is_list(Props) ->
+    Range = couch_util:get_value(<<"range">>, Props),
+    Fun = fun(S, Acc) ->
+        case mem3:range(S) == Range of
+            true -> [mem3:node(S) | Acc];
+            false -> Acc
+        end
+    end,
+    Nodes = lists:foldl(Fun, [], Shards),
+    TBin = couch_util:get_value(<<"target">>, Props),
+    TNode = binary_to_existing_atom(TBin, latin1),
+    lists:member(TNode, Nodes) andalso lists:member(TNode, mem3:nodes()).
+
 %% @doc Find and return the largest update_seq in SourceDb
 %% that the client has seen from TargetNode.
 %%
@@ -335,6 +394,7 @@ pull_purges_multi(#acc{} = Acc0) ->
         hashfun = HashFun
     } = Acc0,
     with_src_db(Acc0, fun(Db) ->
+        cleanup_purge_checkpoints(Db),
         Targets = maps:map(
             fun(_, #tgt{} = T) ->
                 pull_purges(Db, Count, Source, T, HashFun)
@@ -365,9 +425,9 @@ pull_purges(Db, Count, #shard{} = SrcShard, #tgt{} = Tgt0, 
HashFun) ->
     #tgt{shard = TgtShard} = Tgt0,
     SrcUUID = couch_db:get_uuid(Db),
     #shard{node = TgtNode, name = TgtDbName} = TgtShard,
-    {LocalPurgeId, Infos, ThroughSeq, Remaining} =
+    {RemoteId, Infos, ThroughSeq, Remaining} =
         mem3_rpc:load_purge_infos(TgtNode, TgtDbName, SrcUUID, Count),
-    Tgt = Tgt0#tgt{purgeid = LocalPurgeId},
+    Tgt = Tgt0#tgt{purgeid = RemoteId},
     if
         Infos == [] ->
             ok;
@@ -391,7 +451,7 @@ pull_purges(Db, Count, #shard{} = SrcShard, #tgt{} = Tgt0, 
HashFun) ->
             Infos1 = lists:filter(BelongsFun, Infos),
             {ok, _} = couch_db:purge_docs(Db, Infos1, [?REPLICATED_CHANGES]),
             Body = purge_cp_body(SrcShard, TgtShard, ThroughSeq),
-            mem3_rpc:save_purge_checkpoint(TgtNode, TgtDbName, LocalPurgeId, 
Body)
+            mem3_rpc:save_purge_checkpoint(TgtNode, TgtDbName, RemoteId, Body)
     end,
     Tgt#tgt{remaining = max(0, Remaining)}.
 
@@ -427,7 +487,8 @@ push_purges_multi(#acc{} = Acc) ->
     end).
 
 push_purges(Db, BatchSize, SrcShard, Tgt, HashFun) ->
-    #tgt{shard = TgtShard, purgeid = LocalPurgeId} = Tgt,
+    #tgt{shard = TgtShard, purgeid = RemotePurgeId} = Tgt,
+    LocalPurgeId = remote_id_to_local(RemotePurgeId),
     #shard{node = TgtNode, range = TgtRange, name = TgtDbName} = TgtShard,
     StartSeq =
         case couch_db:open_doc(Db, LocalPurgeId, []) of
@@ -741,21 +802,19 @@ update_locals(Target, Db, Seq) ->
     {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
 
 purge_cp_body(#shard{} = Source, #shard{} = Target, PurgeSeq) ->
-    {Mega, Secs, _} = os:timestamp(),
-    NowSecs = Mega * 1000000 + Secs,
     {[
-        {<<"type">>, <<"internal_replication">>},
-        {<<"updated_on">>, NowSecs},
+        {<<"type">>, ?PURGE_TYPE},
+        {<<"updated_on">>, os:system_time(second)},
         {<<"purge_seq">>, PurgeSeq},
         {<<"source">>, atom_to_binary(Source#shard.node, latin1)},
         {<<"target">>, atom_to_binary(Target#shard.node, latin1)},
-        {<<"range">>, Source#shard.range}
+        {<<"range">>, Target#shard.range}
     ]}.
 
 find_repl_doc(SrcDb, TgtUUIDPrefix) ->
     SrcUUID = couch_db:get_uuid(SrcDb),
     S = local_id_hash(SrcUUID),
-    DocIdPrefix = <<"_local/shard-sync-", S/binary, "-">>,
+    DocIdPrefix = <<?CHECKPOINT_PREFIX, S/binary, "-">>,
     FoldFun = fun(#doc{id = DocId, body = {BodyProps}} = Doc, _) ->
         TgtUUID = couch_util:get_value(<<"target_uuid">>, BodyProps, <<>>),
         case is_prefix(DocIdPrefix, DocId) of
@@ -802,7 +861,7 @@ find_split_target_seq_int(TgtDb, Node, SrcUUIDPrefix) ->
                 {ok, not_found}
         end
     end,
-    Options = [{start_key, <<"_local/shard-sync-">>}],
+    Options = [{start_key, <<?CHECKPOINT_PREFIX>>}],
     case couch_db:fold_local_docs(TgtDb, FoldFun, not_found, Options) of
         {ok, Seqs} when is_list(Seqs) ->
             {ok, Seqs};
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
index c62954fc4..51c18d175 100644
--- a/src/mem3/src/mem3_rpc.erl
+++ b/src/mem3/src/mem3_rpc.erl
@@ -189,7 +189,12 @@ load_purge_infos_rpc(DbName, SrcUUID, BatchSize) ->
     case get_or_create_db(DbName, [?ADMIN_CTX]) of
         {ok, Db} ->
             TgtUUID = couch_db:get_uuid(Db),
-            PurgeDocId = mem3_rep:make_purge_id(SrcUUID, TgtUUID),
+            % This is the remote checkpoint running on the target to pull
+            % purges to the source. The changes are flowing from the target to
+            % the source, that's why checkpoint is from tgt to src here. This
+            % is also the same checkpoint used when the target pushed changes
+            % to the source.
+            PurgeDocId = mem3_rep:make_purge_id(TgtUUID, SrcUUID),
             StartSeq =
                 case couch_db:open_doc(Db, PurgeDocId, []) of
                     {ok, #doc{body = {Props}}} ->
@@ -222,19 +227,36 @@ save_purge_checkpoint_rpc(DbName, PurgeDocId, Body) ->
     erlang:put(io_priority, {internal_repl, DbName}),
     case get_or_create_db(DbName, [?ADMIN_CTX]) of
         {ok, Db} ->
-            Doc = #doc{id = PurgeDocId, body = Body},
-            Resp =
-                try couch_db:update_doc(Db, Doc, []) of
-                    Resp0 -> Resp0
-                catch
-                    T:R ->
-                        {T, R}
-                end,
-            rexi:reply(Resp);
+            case purge_checkpoint_updated(Db, PurgeDocId, Body) of
+                true ->
+                    % Checkpoint on the target updated while source pulled the
+                    % changes. Do not update the doc then to avoid rewinding
+                    % back.
+                    rexi:reply({ok, stale});
+                false ->
+                    Doc = #doc{id = PurgeDocId, body = Body},
+                    rexi:reply(
+                        try
+                            couch_db:update_doc(Db, Doc, [])
+                        catch
+                            T:R -> {T, R}
+                        end
+                    )
+            end;
         Error ->
             rexi:reply(Error)
     end.
 
+purge_checkpoint_updated(Db, DocId, {Props}) when is_binary(DocId), 
is_list(Props) ->
+    Seq = couch_util:get_value(<<"purge_seq">>, Props),
+    case couch_db:open_doc(Db, DocId, []) of
+        {ok, #doc{body = {DocProps}}} ->
+            DocSeq = couch_util:get_value(<<"purge_seq">>, DocProps),
+            is_integer(Seq) andalso is_integer(DocSeq) andalso DocSeq > Seq;
+        {not_found, _} ->
+            false
+    end.
+
 replicate_rpc(DbName, Target) ->
     rexi:reply(
         try
diff --git a/src/mem3/test/eunit/mem3_rep_test.erl 
b/src/mem3/test/eunit/mem3_rep_test.erl
index 814fd11b2..0d081a014 100644
--- a/src/mem3/test/eunit/mem3_rep_test.erl
+++ b/src/mem3/test/eunit/mem3_rep_test.erl
@@ -31,6 +31,7 @@ setup() ->
     create_db(PartSrc, [{q, 1}, {n, 1}, {props, PartProps}]),
     create_db(PartTgt, [{q, 2}, {n, 1}, {props, PartProps}]),
     create_local_db(Localdb),
+    meck:new(mem3, [passthrough]),
     #{
         allsrc => AllSrc,
         alltgt => AllTgt,
@@ -40,6 +41,7 @@ setup() ->
     }.
 
 teardown(#{} = Dbs) ->
+    meck:unload(),
     maps:map(
         fun
             (localdb, Db) -> delete_local_db(Db);
@@ -71,7 +73,8 @@ mem3_reshard_db_test_() ->
                     ?TDEF_FE(replicate_low_batch_count, ?TIMEOUT),
                     ?TDEF_FE(replicate_with_partitions, ?TIMEOUT),
                     ?TDEF_FE(replicate_to_and_from_local, ?TIMEOUT),
-                    ?TDEF_FE(replicate_with_purges, ?TIMEOUT)
+                    ?TDEF_FE(replicate_with_purges, ?TIMEOUT),
+                    ?TDEF_FE(clean_purge_checkpoints, ?TIMEOUT)
                 ]
             }
         }
@@ -173,6 +176,110 @@ replicate_with_purges(#{allsrc := AllSrc, alltgt := 
AllTgt}) ->
     ?assertEqual(#{}, SDocs),
     ?assertEqual(#{}, get_all_docs(AllTgt)).
 
+clean_purge_checkpoints(#{allsrc := AllSrc, alltgt := AllTgt}) ->
+    DocSpec = #{docs => 10, delete => [5, 9], purge => [2, 4]},
+    add_test_docs(AllSrc, DocSpec),
+    % Add and purge some docs on target to excercise the pull_purges code path
+    add_test_docs(AllTgt, #{docs => 3, purge => [0, 2]}),
+    [Src] = lists:sort(mem3:local_shards(AllSrc)),
+    [Tgt1, Tgt2] = lists:sort(mem3:local_shards(AllTgt)),
+    #shard{name = SrcName} = Src,
+
+    % Since we don't have multiple nodes running and are just replicating
+    % from one clustered db to another, we need to patch up the shard map
+    % during the replication so it looks targets are part of the shard maps
+    meck:expect(mem3, shards, fun(DbName) ->
+        case DbName == Src#shard.dbname of
+            true -> [Src, Tgt1, Tgt2];
+            false -> meck:passthrough([DbName])
+        end
+    end),
+
+    FakeTarget = '[email protected]',
+
+    % Add a mix of stale, invalid or deprecated purge checkpoints
+    [Uuid1, Uuid2, Uuid3] = [couch_uuids:random() || _ <- lists:seq(1, 3)],
+
+    CheckpointIds = couch_util:with_db(SrcName, fun(Db) ->
+        Uuid = couch_db:get_uuid(Db),
+        Docs = [
+            % This one is ok and should not be cleaned up
+            #doc{
+                id = <<"_local/purge-mem3-", Uuid/binary, "-", Uuid1/binary>>,
+                body =
+                    {[
+                        {<<"type">>, <<"internal_replicator">>},
+                        {<<"updated_on">>, os:system_time(second)},
+                        {<<"purge_seq">>, 10042},
+                        {<<"source">>, atom_to_binary(Src#shard.node, latin1)},
+                        {<<"target">>, atom_to_binary(Tgt1#shard.node, 
latin1)},
+                        {<<"range">>, Tgt1#shard.range}
+                    ]}
+            },
+            % Non-existent range. Should be cleaned up.
+            #doc{
+                id = <<"_local/purge-mem3-", Uuid/binary, "-", Uuid2/binary>>,
+                body =
+                    {[
+                        {<<"type">>, <<"internal_replicator">>},
+                        {<<"updated_on">>, os:system_time(second)},
+                        {<<"purge_seq">>, 10043},
+                        {<<"source">>, atom_to_binary(Src#shard.node, latin1)},
+                        {<<"target">>, atom_to_binary(Tgt1#shard.node, 
latin1)},
+                        {<<"range">>, [0, 1]}
+                    ]}
+            },
+            % Non-existent target. Shoudl be cleaned up.
+            #doc{
+                id = <<"_local/purge-mem3-", Uuid/binary, "-", Uuid3/binary>>,
+                body =
+                    {[
+                        {<<"type">>, <<"internal_replicator">>},
+                        {<<"updated_on">>, os:system_time(second)},
+                        {<<"purge_seq">>, 10044},
+                        {<<"source">>, atom_to_binary(Src#shard.node, latin1)},
+                        {<<"target">>, atom_to_binary(FakeTarget, latin1)},
+                        {<<"range">>, Tgt1#shard.range}
+                    ]}
+            },
+            % Deprecated checkpoint format. Should be cleaned up.
+            #doc{
+                id = <<"_local/purge-mem3-", Uuid1/binary, "-", Uuid/binary>>,
+                body =
+                    {[
+                        {<<"type">>, <<"internal_replicator">>},
+                        {<<"updated_on">>, os:system_time(second)},
+                        {<<"purge_seq">>, 10045},
+                        {<<"source">>, atom_to_binary(Src#shard.node, latin1)},
+                        {<<"target">>, atom_to_binary(Tgt1#shard.node, 
latin1)},
+                        {<<"range">>, Tgt1#shard.range}
+                    ]}
+            }
+        ],
+        {ok, _} = couch_db:update_docs(Db, Docs, []),
+        [Id || #doc{id = Id} <- Docs]
+    end),
+
+    #shard{range = R1} = Tgt1,
+    #shard{range = R2} = Tgt2,
+    TMap = #{R1 => Tgt1, R2 => Tgt2},
+    Opts = [{batch_size, 1000}, {batch_count, all}],
+    ?assertMatch({ok, 0}, mem3_rep:go(Src, TMap, Opts)),
+
+    SDocs = get_all_docs(AllSrc),
+    % Purges from the target should have been pulled and removed docs 0,1,2.
+    % Source should have no live docs.
+    ?assertEqual(#{}, SDocs),
+    ?assertEqual(#{}, get_all_docs(AllTgt)),
+
+    % From the purge checkpoint doc ids we only expect the first one to survive
+    [Id1, Id2, Id3, Id4] = CheckpointIds,
+    LocalDocs = local_docs(SrcName),
+    ?assert(is_map_key(Id1, LocalDocs)),
+    ?assertNot(is_map_key(Id2, LocalDocs)),
+    ?assertNot(is_map_key(Id3, LocalDocs)),
+    ?assertNot(is_map_key(Id4, LocalDocs)).
+
 replicate_to_and_from_local(#{localdb := LocalDb, allsrc := ClusteredDb}) ->
     % We'll just tests that we can pull purges from the target
     add_test_docs(ClusteredDb, #{docs => 6, purge => [0, 2]}),
@@ -381,3 +488,13 @@ atts(Size) when is_integer(Size), Size >= 1 ->
             {data, Data}
         ])
     ].
+
+local_docs(DbName) ->
+    {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
+    FoldFun = fun(#doc{id = DocId, body = Body}, Acc) ->
+        Map = ?JSON_DECODE(?JSON_ENCODE(Body), [return_maps]),
+        {ok, Acc#{DocId => Map}}
+    end,
+    {ok, Res} = couch_db:fold_local_docs(Db, FoldFun, #{}, []),
+    couch_db:close(Db),
+    Res.

Reply via email to