This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 
users-db-and-previous-nodes-in-purge-checkpoints
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 1aefb4d3c18fdfe8967ef0aa6b8068dd4565c2b8
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Wed Dec 24 01:25:42 2025 -0500

    Consider previous node replications for _dbs purge checkpoints
    
    In the previous PR [1] we added special handling for shards dbs since it 
has a
    custom ring replication topology. In PR [1] we considered only the 
checkpoint
    for pushes from the current node to the "next" none in the ring. However, we
    should also consider the checkpoints created by the "previous" node when it
    pull purges from current node, so that what we fix in this PR.
    
    As a reminder, a replication job from node A to node B will:
    
      1) pull purges from B to A (checkpoint on B with a B->A purge checkpoint 
doc)
      2) push purges from A to B (checkpoint on A with a A->B purge checkpoint 
doc)
      3) push do updates from A to B (checkpoint on A with A->B sync checkpoint 
doc, and
          on B also with an A->B sync checkpoint doc)
    
    [1] https://github.com/apache/couchdb/pull/5832
---
 src/mem3/src/mem3_rep.erl  | 49 ++++++++++++++++++++++++++++++++++++++--------
 src/mem3/src/mem3_sync.erl | 43 ++++++++++++++++++++++++++++++++++------
 2 files changed, 78 insertions(+), 14 deletions(-)

diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 153eb28d9..6413c72a4 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -250,11 +250,20 @@ have_all_purge_checkpoints(true, Db, [_ | _] = Shards) ->
                 % mem3:shards/1 returns a single #shard{} record with node =
                 % node(), name = _dbs, range = [0, ?RING_END] and it should
                 % replicate in a ring to the dbs copy on "next" node in a ring.
+                % We push purges to the next node and the previous node pull
+                % purges from us, so we expect to have only two purge
+                % checkpoints for the next and previous nodes.
                 Next = mem3_sync:find_next_node(),
-                % If we're the only node, then next == node()
+                Prev = mem3_sync:find_previous_node(),
+                % If we're the only node, then next == node() and prev == 
node()
                 case Next == config:node_name() of
-                    true -> couch_util:new_set();
-                    false -> couch_util:set_from_list([{Next, [0, ?RING_END]}])
+                    true ->
+                        couch_util:new_set();
+                    false ->
+                        couch_util:set_from_list([
+                            {Next, [0, ?RING_END]},
+                            {Prev, [0, ?RING_END]}
+                        ])
                 end;
             false ->
                 % Keep only shard copies. These are not necessarily ones with 
a matching
@@ -325,12 +334,15 @@ verify_checkpoint_shard(Shards, Props) when 
is_list(Shards), is_list(Props) ->
     case Shards of
         [#shard{dbname = ShardsDb}] ->
             % This is shards db itself. It's a special case since replications
-            % copies are other shard db copies replicated in a ring
+            % copies are other shard db copies replicated in a ring. We push
+            % purges to the next and node and the previous node pull purges
+            % from us. So we expect to have two purge replication checkpoints.
             Next = mem3_sync:find_next_node(),
+            Prev = mem3_sync:find_previous_node(),
             % If we're the only node, the next == node()
             case Next == config:node_name() of
                 true -> false;
-                false -> TNode == Next
+                false -> TNode == Next orelse TNode == Prev
             end;
         _ ->
             Range = couch_util:get_value(<<"range">>, Props),
@@ -1437,13 +1449,16 @@ t_have_all_shards_db(_) ->
 
     Src1 = #shard{name = Dbs, node = node(), range = Range},
     Tgt1 = #shard{name = Dbs, node = 'n2', range = Range},
+    Tgt2 = #shard{name = Dbs, node = 'n3', range = Range},
 
     % We're the only node: don't expect any other checkpoints
     meck:expect(mem3_sync, find_next_node, 0, node()),
+    meck:expect(mem3_sync, find_previous_node, 0, node()),
     ?assert(have_all_purge_checkpoints(Dbs)),
 
     % There is another node and we don't have a checkpoint for it
     meck:expect(mem3_sync, find_next_node, 0, 'n2'),
+    meck:expect(mem3_sync, find_previous_node, 0, 'n3'),
     ?assert(not have_all_purge_checkpoints(Dbs)),
 
     Body1 = purge_cp_body(Src1, Tgt1, 42),
@@ -1451,11 +1466,21 @@ t_have_all_shards_db(_) ->
     DocId1 = make_purge_id(SrcUuid, TgtUuid1),
     Doc1 = #doc{id = DocId1, body = Body1},
     {ok, _} = couch_db:update_doc(Db, Doc1, [?ADMIN_CTX]),
-    couch_db:close(Db),
 
-    % After adding the checkpoint for n2, we should get true again
+    % After adding the checkpoint for n2, we should still get false because
+    % there is no previous checkpoint for n3 pull purges from us
+    ?assert(not have_all_purge_checkpoints(Dbs)),
+
+    Body2 = purge_cp_body(Src1, Tgt2, 43),
+    TgtUuid2 = couch_uuids:random(),
+    DocId2 = make_purge_id(SrcUuid, TgtUuid2),
+    Doc2 = #doc{id = DocId2, body = Body2},
+    {ok, _} = couch_db:update_doc(Db, Doc2, [?ADMIN_CTX]),
+
+    % After adding the checkpoint for n3, we should get true
     ?assert(have_all_purge_checkpoints(Dbs)),
 
+    couch_db:close(Db),
     ok = couch_server:delete(Dbs, [?ADMIN_CTX]).
 
 t_verify_checkpoint_shards_db(_) ->
@@ -1470,6 +1495,14 @@ t_verify_checkpoint_shards_db(_) ->
     ],
     ?assert(not verify_checkpoint_shard(Shards, Props1)),
     meck:expect(mem3_sync, find_next_node, 0, 'n2'),
-    ?assert(verify_checkpoint_shard(Shards, Props1)).
+    ?assert(verify_checkpoint_shard(Shards, Props1)),
+
+    Props2 = [
+        {<<"target">>, atom_to_binary(n3, latin1)},
+        {<<"range">>, Range}
+    ],
+    ?assert(not verify_checkpoint_shard(Shards, Props2)),
+    meck:expect(mem3_sync, find_previous_node, 0, 'n3'),
+    ?assert(verify_checkpoint_shard(Shards, Props2)).
 
 -endif.
diff --git a/src/mem3/src/mem3_sync.erl b/src/mem3/src/mem3_sync.erl
index 67eb77181..363b3e0b7 100644
--- a/src/mem3/src/mem3_sync.erl
+++ b/src/mem3/src/mem3_sync.erl
@@ -32,7 +32,8 @@
     nodes_db/0,
     shards_db/0,
     users_db/0,
-    find_next_node/0
+    find_next_node/0,
+    find_previous_node/0
 ]).
 -export([
     local_dbs/0
@@ -307,10 +308,19 @@ find_next_node() ->
     Self = node(),
     LiveNodes = [Self | nodes()],
     Mem3Nodes = mem3:nodes(),
-    find_next_node(Self, LiveNodes, Mem3Nodes).
+    find_next_node(Self, LiveNodes, lists:sort(Mem3Nodes)).
 
-find_next_node(Self, LiveNodes, Mem3Nodes) ->
-    SortedMem3Nodes = lists:sort(Mem3Nodes),
+find_previous_node() ->
+    Self = node(),
+    LiveNodes = [Self | nodes()],
+    Mem3Nodes = mem3:nodes(),
+    % Previous node is the "next" node in the reverse sorted list
+    find_previous_node(Self, LiveNodes, lists:sort(Mem3Nodes)).
+
+find_previous_node(Self, LiveNodes, SortedMem3Nodes) ->
+    find_next_node(Self, LiveNodes, lists:reverse(SortedMem3Nodes)).
+
+find_next_node(Self, LiveNodes, SortedMem3Nodes) ->
     LiveMem3Nodes = [N || N <- SortedMem3Nodes, lists:member(N, LiveNodes)],
     case LiveMem3Nodes of
         [] ->
@@ -404,13 +414,34 @@ is_job_current(#job{name = Name, node = Node}, 
ConnectedNodes, Mem3Nodes) ->
 
 find_next_node_test() ->
     ?assertEqual(n, find_next_node(n, [n], [])),
+    ?assertEqual(n, find_previous_node(n, [], [])),
+
     ?assertEqual(n, find_next_node(n, [n], [n])),
+    ?assertEqual(n, find_previous_node(n, [n], [n])),
+
+    % We're in the middle
     ?assertEqual(x, find_next_node(n, [a, n, x], [a, n, x])),
+    ?assertEqual(a, find_previous_node(n, [a, n, x], [a, n, x])),
+
+    % Two nodes, we're at the end (start, for previous)
     ?assertEqual(a, find_next_node(n, [a, n], [a, n])),
+    ?assertEqual(a, find_previous_node(n, [a, n], [a, n])),
+
+    % We're on a node that's not in mem3:nodes() so next/prev is ourselves.
     ?assertEqual(n, find_next_node(n, [a, n], [a])),
-    ?assertEqual(x, find_next_node(n, [n, x], [x, n])),
+    ?assertEqual(n, find_previous_node(n, [a, n], [a])),
+
+    % Two nodes, we're at the start (end, for previous). Live nodes unsorted
+    ?assertEqual(x, find_next_node(n, [x, n], [n, x])),
+    ?assertEqual(x, find_previous_node(n, [x, n], [n, x])),
+
+    % Two nodes, we're at the start (end, for previous). Live nodes are sorted
     ?assertEqual(x, find_next_node(n, [n, x], [n, x])),
-    ?assertEqual(a, find_next_node(n, [a, n, x], [a, n, y])).
+    ?assertEqual(x, find_previous_node(n, [n, x], [n, x])),
+
+    % node x is not in mem3:nodes() and node and y is not live
+    ?assertEqual(a, find_next_node(n, [a, n, x], [a, n, y])),
+    ?assertEqual(a, find_previous_node(n, [a, n, x], [a, n, y])).
 
 is_job_current_test_() ->
     {

Reply via email to