This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch auto-delete-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/auto-delete-3 by this push:
     new 35e35193a push some calculation to the worker side
35e35193a is described below

commit 35e35193a85c355e617362e4d48eca40ac69c5c0
Author: Robert Newson <[email protected]>
AuthorDate: Fri May 16 14:44:52 2025 +0100

    push some calculation to the worker side
---
 src/fabric/src/fabric_drop_seq.erl | 224 ++++++++++++++++++++-----------------
 1 file changed, 119 insertions(+), 105 deletions(-)

diff --git a/src/fabric/src/fabric_drop_seq.erl 
b/src/fabric/src/fabric_drop_seq.erl
index 1811fbb7d..5d0ea970c 100644
--- a/src/fabric/src/fabric_drop_seq.erl
+++ b/src/fabric/src/fabric_drop_seq.erl
@@ -3,7 +3,6 @@
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
--include_lib("fabric/include/fabric.hrl").
 
 -export([go/1]).
 
@@ -15,6 +14,9 @@
     peer_id_from_sig/2
 ]).
 
+%% rpc
+-export([gather_drop_seq_info_rpc/1]).
+
 -type range() :: [non_neg_integer()].
 
 -type uuid() :: binary().
@@ -35,8 +37,9 @@
 
 go(DbName) ->
     Shards0 = mem3:shards(DbName),
-    {ok, PeerCheckpoints} = get_peer_checkpoint_docs(DbName),
-    {ok, ShardSyncHistory} = get_all_shard_sync_docs(Shards0),
+    #{peer_checkpoints := PeerCheckpoints, shard_sync_history := 
ShardSyncHistory} = gather_drop_seq_info(
+        Shards0
+    ),
     {Shards1, DropSeqs} = go_int(
         Shards0, fun uuid_fetcher_rpc/1, PeerCheckpoints, ShardSyncHistory
     ),
@@ -198,56 +201,96 @@ handle_get_uuid_reply({rexi_EXIT, _Reason}, _Worker, 
_Acc) ->
 handle_get_uuid_reply(Uuid, _, _Acc) when is_binary(Uuid) ->
     {stop, {ok, Uuid}}.
 
--spec get_all_shard_sync_docs(Shards :: [#shard{}]) -> shard_sync_history().
-get_all_shard_sync_docs(Shards) ->
+%% return only the shards that have synced with every other replica
+fully_replicated_shards_only(Shards, ShardSyncHistory) ->
+    lists:filter(
+        fun(#shard{range = Range, node = Node}) ->
+            ExpectedPeers = [
+                S#shard.node
+             || S <- Shards, S#shard.range == Range, S#shard.node /= Node
+            ],
+            ExpectedKeys = [{Range, Peer, Node} || Peer <- ExpectedPeers],
+            lists:all(fun(Key) -> maps:is_key(Key, ShardSyncHistory) end, 
ExpectedKeys)
+        end,
+        Shards
+    ).
+
+-spec gather_drop_seq_info(Shards :: [#shard{}]) -> {peer_checkpoints(), 
shard_sync_history()}.
+gather_drop_seq_info([#shard{} | _] = Shards) ->
     Workers = fabric_util:submit_jobs(
-        Shards, fabric_rpc, all_docs, [[], shard_sync_docs_mrargs()]
+        Shards, ?MODULE, gather_drop_seq_info_rpc, []
     ),
-    Acc0 = {#{}, length(Workers) - 1},
     RexiMon = fabric_util:create_monitors(Workers),
+    Acc0 = #{},
     try
-        rexi_utils:recv(
-            Workers,
-            #shard.ref,
-            fun handle_shard_sync_docs_reply/3,
-            Acc0,
-            fabric_util:request_timeout(),
-            infinity
-        )
+        case
+            rexi_utils:recv(
+                Workers,
+                #shard.ref,
+                fun gather_drop_seq_info_cb/3,
+                {Acc0, length(Workers) - 1},
+                fabric_util:request_timeout(),
+                infinity
+            )
+        of
+            {ok, Result} ->
+                Result;
+            {timeout, _State} ->
+                {error, timeout};
+            {error, Reason} ->
+                {error, Reason}
+        end
     after
         rexi_monitor:stop(RexiMon),
         fabric_streams:cleanup(Workers)
     end.
 
-%% consult every copy of every range for shard sync information but ignore 
failures (otherwise
-%% this only works when all nodes are up). We'll only update drop seq for a 
shard if we have
-%% seen all other copies have synced to it.
-handle_shard_sync_docs_reply({rexi_DOWN, _, _, _}, _Worker, {ShardSyncHistory, 
Count}) ->
-    {ok, {ShardSyncHistory, Count - 1}};
-handle_shard_sync_docs_reply({rexi_EXIT, _Reason}, _Worker, {ShardSyncHistory, 
Count}) ->
-    {ok, {ShardSyncHistory, Count - 1}};
-handle_shard_sync_docs_reply(rexi_STREAM_INIT, {_Worker, From}, Acc) ->
-    rexi:stream_start(From),
-    {ok, Acc};
-handle_shard_sync_docs_reply({meta, _Meta}, {_Worker, From}, Acc) ->
-    rexi:stream_ack(From),
-    {ok, Acc};
-handle_shard_sync_docs_reply(#view_row{} = Row, {_Worker, From}, 
{ShardSyncHistory, Count}) ->
-    Doc = couch_doc:from_json_obj(Row#view_row.doc),
-    Result = parse_shard_sync_doc(Doc, ShardSyncHistory),
-    rexi:stream_ack(From),
-    {ok, {Result, Count}};
-handle_shard_sync_docs_reply(complete, _Worker, {ShardSyncHistory, 0}) ->
-    {stop, ShardSyncHistory};
-handle_shard_sync_docs_reply(complete, _Worker, {ShardSyncHistory, Count}) ->
-    {ok, {ShardSyncHistory, Count - 1}}.
-
-parse_shard_sync_doc(#doc{id = <<"_local/shard-sync-", _/binary>>} = Doc, Acc) 
->
+gather_drop_seq_info_rpc(DbName) ->
+    case couch_db:open_int(DbName, []) of
+        {ok, Db} ->
+            try
+                Uuid = couch_db:get_uuid(Db),
+                Acc0 = {#{}, #{}},
+                {ok, {PeerCheckpoints, ShardSyncHistory}} = 
couch_db:fold_local_docs(
+                    Db, fun gather_drop_seq_info_fun/2, Acc0, []
+                ),
+                rexi:reply(
+                    {ok, #{
+                        uuid => Uuid,
+                        peer_checkpoints => PeerCheckpoints,
+                        shard_sync_history => ShardSyncHistory
+                    }}
+                )
+            after
+                couch_db:close(Db)
+            end;
+        Else ->
+            rexi:reply(Else)
+    end.
+
+gather_drop_seq_info_fun(
+    #doc{id = <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", _/binary>>} = Doc,
+    {PeerCheckpoints0, ShardSyncHistory} = Acc
+) ->
+    {Props} = Doc#doc.body,
+    case couch_util:get_value(<<"update_seq">>, Props) of
+        undefined ->
+            {ok, Acc};
+        UpdateSeq ->
+            PeerCheckpoints1 = maps:merge_with(
+                fun merge_peers/3, decode_seq(UpdateSeq), PeerCheckpoints0
+            ),
+            {ok, {PeerCheckpoints1, ShardSyncHistory}}
+    end;
+gather_drop_seq_info_fun(
+    #doc{id = <<?LOCAL_DOC_PREFIX, "shard-sync-", _/binary>>} = Doc,
+    {PeerCheckpoints, ShardSyncHistory0} = Acc
+) ->
     {Props} = Doc#doc.body,
     case couch_util:get_value(<<"dbname">>, Props) of
         undefined ->
             %% not yet upgraded with new property
-            Acc;
+            {ok, Acc};
         DbName ->
             Range = mem3:range(DbName),
             {[{_SrcNode, History}]} = couch_util:get_value(<<"history">>, 
Props),
@@ -263,52 +306,45 @@ parse_shard_sync_doc(#doc{id = <<"_local/shard-sync-", 
_/binary>>} = Doc, Acc) -
                     couch_util:get_value(<<"target_seq">>, Item)
                 }
             end,
-            maps:merge(
-                maps:groups_from_list(KeyFun, ValueFun, History), Acc
-            )
-    end.
-
-%% return only the shards that have synced with every other replica
-fully_replicated_shards_only(Shards, ShardSyncHistory) ->
-    lists:filter(
-        fun(#shard{range = Range, node = Node}) ->
-            ExpectedPeers = [
-                S#shard.node
-             || S <- Shards, S#shard.range == Range, S#shard.node /= Node
-            ],
-            ExpectedKeys = [{Range, Peer, Node} || Peer <- ExpectedPeers],
-            lists:all(fun(Key) -> maps:is_key(Key, ShardSyncHistory) end, 
ExpectedKeys)
-        end,
-        Shards
-    ).
-
--spec get_peer_checkpoint_docs(DbName :: binary()) -> peer_checkpoints().
-get_peer_checkpoint_docs(DbName) ->
-    fabric:all_docs(
-        DbName, fun parse_peer_checkpoint_docs_cb/2, #{}, 
peer_checkpoint_docs_mrargs()
-    ).
-
-parse_peer_checkpoint_docs_cb({row, Row}, PeerCheckpoints0) ->
-    case lists:keyfind(doc, 1, Row) of
-        false ->
-            {ok, PeerCheckpoints0};
-        {doc, Doc0} ->
-            #doc{id = <<"_local/peer-checkpoint-", _/binary>>} =
-                Doc1 = couch_doc:from_json_obj(Doc0),
-            {Props} = Doc1#doc.body,
-            case couch_util:get_value(<<"update_seq">>, Props) of
-                undefined ->
-                    {ok, PeerCheckpoints0};
-                UpdateSeq ->
-                    {ok,
-                        maps:merge_with(
-                            fun merge_peers/3, decode_seq(UpdateSeq), 
PeerCheckpoints0
-                        )}
-            end
+            ShardSyncHistory1 = maps:merge(
+                maps:groups_from_list(KeyFun, ValueFun, History), 
ShardSyncHistory0
+            ),
+            {ok, {PeerCheckpoints, ShardSyncHistory1}}
     end;
-parse_peer_checkpoint_docs_cb(_Else, Acc) ->
+gather_drop_seq_info_fun(#doc{}, Acc) ->
+    %% ignored
     {ok, Acc}.
 
+gather_drop_seq_info_cb({rexi_DOWN, _, _, _}, _Worker, {Acc, Count}) ->
+    {ok, {Acc, Count - 1}};
+gather_drop_seq_info_cb({rexi_EXIT, _Reason}, _Worker, {Acc, Count}) ->
+    {ok, {Acc, Count - 1}};
+gather_drop_seq_info_cb({ok, Info}, Worker, {Acc, Count}) ->
+    MergedInfo = merge_info(Worker, Info, Acc),
+    if
+        Count == 0 ->
+            {stop, MergedInfo};
+        true ->
+            {ok, {MergedInfo, Count - 1}}
+    end;
+gather_drop_seq_info_cb(_Error, _Worker, {Acc, Count}) ->
+    {ok, {Acc, Count - 1}}.
+
+merge_info(#shard{} = _Shard, InfoA, InfoB) ->
+    PeerCheckpointsA = maps:get(peer_checkpoints, InfoA, #{}),
+    PeerCheckpointsB = maps:get(peer_checkpoints, InfoB, #{}),
+    MergedPeerCheckpoints = maps:merge_with(
+        fun merge_peers/3, PeerCheckpointsA, PeerCheckpointsB
+    ),
+    ShardSyncHistoryA = maps:get(shard_sync_history, InfoA, #{}),
+    ShardSyncHistoryB = maps:get(shard_sync_history, InfoB, #{}),
+    MergedShardSyncHistory = maps:merge(
+        ShardSyncHistoryA, ShardSyncHistoryB
+    ),
+    #{
+        peer_checkpoints => MergedPeerCheckpoints, shard_sync_history => 
MergedShardSyncHistory
+    }.
+
 merge_peers(_Key, {Uuid1, Val1}, {Uuid2, Val2}) when
     is_binary(Uuid1), is_binary(Uuid2), is_integer(Val1), is_integer(Val2)
 ->
@@ -341,28 +377,6 @@ decode_seq(OpaqueSeq) ->
         Decoded
     ).
 
-all_docs_mrargs() ->
-    #mrargs{
-        view_type = map,
-        include_docs = true,
-        extra = [
-            {include_system, true},
-            {namespace, <<"_local">>}
-        ]
-    }.
-
-peer_checkpoint_docs_mrargs() ->
-    (all_docs_mrargs())#mrargs{
-        start_key = <<?LOCAL_DOC_PREFIX, "peer-checkpoint-">>,
-        end_key = <<?LOCAL_DOC_PREFIX, "peer-checkpoint.">>
-    }.
-
-shard_sync_docs_mrargs() ->
-    (all_docs_mrargs())#mrargs{
-        start_key = <<?LOCAL_DOC_PREFIX, "shard-sync-">>,
-        end_key = <<?LOCAL_DOC_PREFIX, "shard-sync.">>
-    }.
-
 latest_shard_sync_checkpoints(ShardSyncHistory) ->
     maps:fold(
         fun({R, SN, _TN}, History, Acc) ->

Reply via email to