This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch auto-delete-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/auto-delete-3 by this push:
     new 7f8d1b567 uuidmap instead
7f8d1b567 is described below

commit 7f8d1b567f61a094e90571b2253f94eb5c01e670
Author: Robert Newson <[email protected]>
AuthorDate: Mon May 19 12:21:57 2025 +0100

    uuidmap instead
---
 src/fabric/src/fabric_drop_seq.erl | 117 +++++++++++++------------------------
 1 file changed, 39 insertions(+), 78 deletions(-)

diff --git a/src/fabric/src/fabric_drop_seq.erl 
b/src/fabric/src/fabric_drop_seq.erl
index 5d0ea970c..928a1bdb3 100644
--- a/src/fabric/src/fabric_drop_seq.erl
+++ b/src/fabric/src/fabric_drop_seq.erl
@@ -23,7 +23,7 @@
 
 -type seq() :: non_neg_integer().
 
--type uuid_fetcher() :: fun((#shard{}) -> {ok, uuid()} | {error, term()}).
+-type uuid_map() :: #{{Range :: range(), Node :: node()} => uuid()}.
 
 -type peer_checkpoints() :: #{{range(), Node :: node()} => {Uuid :: uuid(), 
Seq :: seq()}}.
 
@@ -37,11 +37,15 @@
 
 go(DbName) ->
     Shards0 = mem3:shards(DbName),
-    #{peer_checkpoints := PeerCheckpoints, shard_sync_history := 
ShardSyncHistory} = gather_drop_seq_info(
+    #{
+        uuid_map := UuidMap,
+        peer_checkpoints := PeerCheckpoints,
+        shard_sync_history := ShardSyncHistory
+    } = gather_drop_seq_info(
         Shards0
     ),
     {Shards1, DropSeqs} = go_int(
-        Shards0, fun uuid_fetcher_rpc/1, PeerCheckpoints, ShardSyncHistory
+        Shards0, UuidMap, PeerCheckpoints, ShardSyncHistory
     ),
     Workers = lists:filtermap(
         fun(Shard) ->
@@ -165,42 +169,6 @@ crossref(PeerCheckpoints0, ShardSyncHistory) ->
             crossref(PeerCheckpoints1, ShardSyncHistory)
     end.
 
--spec uuid_fetcher_rpc(#shard{}) -> uuid().
-uuid_fetcher_rpc(#shard{} = Shard) ->
-    Workers = fabric_util:submit_jobs(
-        [Shard], fabric_rpc, get_uuid, []
-    ),
-    RexiMon = fabric_util:create_monitors(Workers),
-    try
-        case
-            rexi_utils:recv(
-                Workers,
-                #shard.ref,
-                fun handle_get_uuid_reply/3,
-                nil,
-                fabric_util:request_timeout(),
-                infinity
-            )
-        of
-            {ok, Result} ->
-                Result;
-            {timeout, _State} ->
-                {error, timeout};
-            {error, Reason} ->
-                {error, Reason}
-        end
-    after
-        rexi_monitor:stop(RexiMon),
-        fabric_streams:cleanup(Workers)
-    end.
-
-handle_get_uuid_reply({rexi_DOWN, _, _, _}, _Worker, _Acc) ->
-    {stop, {error, rexi_DOWN}};
-handle_get_uuid_reply({rexi_EXIT, _Reason}, _Worker, _Acc) ->
-    {stop, {error, rexi_EXIT}};
-handle_get_uuid_reply(Uuid, _, _Acc) when is_binary(Uuid) ->
-    {stop, {ok, Uuid}}.
-
 %% return only the shards that have synced with every other replica
 fully_replicated_shards_only(Shards, ShardSyncHistory) ->
     lists:filter(
@@ -221,7 +189,7 @@ gather_drop_seq_info([#shard{} | _] = Shards) ->
         Shards, ?MODULE, gather_drop_seq_info_rpc, []
     ),
     RexiMon = fabric_util:create_monitors(Workers),
-    Acc0 = #{},
+    Acc0 = #{uuid_map => #{}, peer_checkpoints => #{}, shard_sync_history => 
#{}},
     try
         case
             rexi_utils:recv(
@@ -330,19 +298,20 @@ gather_drop_seq_info_cb({ok, Info}, Worker, {Acc, Count}) 
->
 gather_drop_seq_info_cb(_Error, _Worker, {Acc, Count}) ->
     {ok, {Acc, Count - 1}}.
 
-merge_info(#shard{} = _Shard, InfoA, InfoB) ->
-    PeerCheckpointsA = maps:get(peer_checkpoints, InfoA, #{}),
-    PeerCheckpointsB = maps:get(peer_checkpoints, InfoB, #{}),
-    MergedPeerCheckpoints = maps:merge_with(
-        fun merge_peers/3, PeerCheckpointsA, PeerCheckpointsB
-    ),
-    ShardSyncHistoryA = maps:get(shard_sync_history, InfoA, #{}),
-    ShardSyncHistoryB = maps:get(shard_sync_history, InfoB, #{}),
-    MergedShardSyncHistory = maps:merge(
-        ShardSyncHistoryA, ShardSyncHistoryB
-    ),
+merge_info(#shard{} = Shard, Info, Acc) ->
     #{
-        peer_checkpoints => MergedPeerCheckpoints, shard_sync_history => 
MergedShardSyncHistory
+        uuid_map =>
+            maps:put(
+                {Shard#shard.range, Shard#shard.node}, maps:get(uuid, Info), 
maps:get(uuid_map, Acc)
+            ),
+        peer_checkpoints => maps:merge_with(
+            fun merge_peers/3,
+            maps:get(peer_checkpoints, Info),
+            maps:get(peer_checkpoints, Acc)
+        ),
+        shard_sync_history => maps:merge(
+            maps:get(shard_sync_history, Info), maps:get(shard_sync_history, 
Acc)
+        )
     }.
 
 merge_peers(_Key, {Uuid1, Val1}, {Uuid2, Val2}) when
@@ -388,8 +357,8 @@ latest_shard_sync_checkpoints(ShardSyncHistory) ->
     ).
 
 %% A shard may have been split since a peer saw it.
--spec substitute_splits([#shard{}], uuid_fetcher(), peer_checkpoints()) -> 
peer_checkpoints().
-substitute_splits(Shards, UuidFetcher, PeerCheckpoints) ->
+-spec substitute_splits([#shard{}], uuid_map(), peer_checkpoints()) -> 
peer_checkpoints().
+substitute_splits(Shards, UuidMap, PeerCheckpoints) ->
     maps:fold(
         fun({[B1, E1], Node}, {Uuid, Seq}, Acc) ->
             ShardsInRange = [
@@ -407,10 +376,10 @@ substitute_splits(Shards, UuidFetcher, PeerCheckpoints) ->
                             [B1, E1] == Shard#shard.range ->
                                 {true, {Key, {Uuid, Seq}}};
                             true ->
-                                case UuidFetcher(Shard) of
+                                case maps:find(Key, UuidMap) of
                                     {ok, SubstUuid} ->
                                         {true, {Key, {SubstUuid, Seq}}};
-                                    {error, _Reason} ->
+                                    error ->
                                         false
                                 end
                         end
@@ -764,14 +733,15 @@ substitute_splits_test() ->
     Subrange2 = [6, 10],
     Node1 = '[email protected]',
     Shards = [#shard{range = Subrange1, node = Node1}, #shard{range = 
Subrange2, node = Node1}],
-    UuidFetcher = uuid_fetcher_from_map(#{
-        {Subrange1, Node1} => <<"uuid2">>, {Subrange2, Node1} => <<"uuid3">>
-    }),
+    UuidMap = #{
+        {Subrange1, Node1} => <<"uuid2">>,
+        {Subrange2, Node1} => <<"uuid3">>
+    },
     PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}},
 
     ?assertEqual(
         #{{Subrange1, Node1} => {<<"uuid2">>, 12}, {Subrange2, Node1} => 
{<<"uuid3">>, 12}},
-        substitute_splits(Shards, UuidFetcher, PeerCheckpoints)
+        substitute_splits(Shards, UuidMap, PeerCheckpoints)
     ).
 
 crossref_test_() ->
@@ -833,15 +803,16 @@ go_int_test_() ->
     Subrange2 = [6, 10],
     Node1 = '[email protected]',
     Shards = [#shard{range = Subrange1, node = Node1}, #shard{range = 
Subrange2, node = Node1}],
-    UuidFetcher = uuid_fetcher_from_map(#{
-        {Subrange1, Node1} => <<"uuid2">>, {Subrange2, Node1} => <<"uuid3">>
-    }),
+    UuidMap = #{
+        {Subrange1, Node1} => <<"uuid2">>,
+        {Subrange2, Node1} => <<"uuid3">>
+    },
     [
         ?_assertEqual(
             {Shards, #{
                 {Subrange1, Node1} => {<<"uuid2">>, 12}, {Subrange2, Node1} => 
{<<"uuid3">>, 12}
             }},
-            go_int(Shards, UuidFetcher, #{{Range, Node1} => {<<"uuid1">>, 
12}}, #{})
+            go_int(Shards, UuidMap, #{{Range, Node1} => {<<"uuid1">>, 12}}, 
#{})
         ),
         ?_assertEqual(
             {Shards, #{
@@ -849,7 +820,7 @@ go_int_test_() ->
             }},
             go_int(
                 Shards,
-                UuidFetcher,
+                UuidMap,
                 #{{Range, Node1} => {<<"uuid1">>, 12}, {Subrange1, Node1} => 
{<<"uuid2">>, 10}},
                 #{}
             )
@@ -868,12 +839,12 @@ go_int2_test_() ->
         #shard{range = Subrange1, node = Node2},
         #shard{range = Subrange2, node = Node2}
     ],
-    UuidFetcher = uuid_fetcher_from_map(#{
+    UuidMap = #{
         {Subrange1, Node1} => <<"s1n1">>,
         {Subrange2, Node1} => <<"s2n1">>,
         {Subrange1, Node2} => <<"s1n2">>,
         {Subrange2, Node2} => <<"s2n2">>
-    }),
+    },
     ShardSyncHistory =
         #{
             {Subrange1, Node1, Node2} => [
@@ -913,7 +884,7 @@ go_int2_test_() ->
                 2,
                 go_int(
                     Shards,
-                    UuidFetcher,
+                    UuidMap,
                     #{{Range, Node1} => {<<"ignored">>, 12}},
                     ShardSyncHistory
                 )
@@ -921,14 +892,4 @@ go_int2_test_() ->
         )
     ].
 
-uuid_fetcher_from_map(UuidMap) ->
-    fun(Shard) ->
-        case maps:find({Shard#shard.range, Shard#shard.node}, UuidMap) of
-            {ok, Value} ->
-                {ok, Value};
-            error ->
-                {error, not_found}
-        end
-    end.
-
 -endif.

Reply via email to