This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch auto-delete-3 in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 5943d2cfef6c07035ee03229796180c3ef1bfb05 Author: Robert Newson <[email protected]> AuthorDate: Mon May 12 14:32:43 2025 +0100 fetch uuids and use them for splits --- src/couch/src/couch_bt_engine.erl | 14 ++----- src/fabric/src/fabric_drop_seq.erl | 80 +++++++++++++++++++++++++++----------- 2 files changed, 62 insertions(+), 32 deletions(-) diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl index df3d01d30..b831337a1 100644 --- a/src/couch/src/couch_bt_engine.erl +++ b/src/couch/src/couch_bt_engine.erl @@ -814,21 +814,15 @@ set_update_seq(#st{header = Header} = St, UpdateSeq) -> needs_commit = true }}. -set_drop_seq(#st{} = St, undefined, NewDropSeq) -> - set_drop_seq(St, NewDropSeq); -set_drop_seq(#st{} = St, ExpectedUuidPrefix, NewDropSeq) when is_binary(ExpectedUuidPrefix) -> +set_drop_seq(#st{header = Header} = St, ExpectedUuidPrefix, NewDropSeq) when + is_binary(ExpectedUuidPrefix), is_integer(NewDropSeq), NewDropSeq > 0 +-> + CurrentDropSeq = get_drop_seq(St), Uuid = get_uuid(St), ActualUuidPrefix = binary:part(Uuid, 0, byte_size(ExpectedUuidPrefix)), if ExpectedUuidPrefix /= ActualUuidPrefix -> {error, uuid_mismatch}; - true -> - set_drop_seq(St, NewDropSeq) - end. - -set_drop_seq(#st{header = Header} = St, NewDropSeq) when is_integer(NewDropSeq), NewDropSeq > 0 -> - CurrentDropSeq = get_drop_seq(St), - if NewDropSeq < CurrentDropSeq -> {error, {drop_seq_cant_decrease, CurrentDropSeq, NewDropSeq}}; true -> diff --git a/src/fabric/src/fabric_drop_seq.erl b/src/fabric/src/fabric_drop_seq.erl index e6bb24c43..44c5abc7d 100644 --- a/src/fabric/src/fabric_drop_seq.erl +++ b/src/fabric/src/fabric_drop_seq.erl @@ -7,6 +7,8 @@ -export([go/1]). +-compile(export_all). + -export([ create_peer_checkpoint_doc_if_missing/5, update_peer_checkpoint_doc/5, @@ -21,6 +23,8 @@ -type seq() :: non_neg_integer(). +-type uuid_map() :: #{{range(), Node :: node()} => Uuid :: uuid()}. + -type peer_checkpoints() :: #{{range(), Node :: node()} => {Uuid :: uuid(), Seq :: seq()}}. -type history_item() :: { @@ -33,9 +37,10 @@ go(DbName) -> Shards0 = mem3:shards(DbName), + {ok, UuidMap} = get_uuids(Shards0), {ok, PeerCheckpoints} = get_peer_checkpoint_docs(DbName), {ok, ShardSyncHistory} = get_all_shard_sync_docs(Shards0), - {Shards1, DropSeqs} = go_int(Shards0, PeerCheckpoints, ShardSyncHistory), + {Shards1, DropSeqs} = go_int(Shards0, UuidMap, PeerCheckpoints, ShardSyncHistory), Workers = lists:filtermap( fun(Shard) -> #shard{range = Range, node = Node, name = ShardName} = Shard, @@ -81,13 +86,15 @@ go(DbName) -> end end. -go_int(Shards, PeerCheckpoints, ShardSyncHistory) -> +go_int(Shards0, UuidMap, PeerCheckpoints, ShardSyncHistory) -> + Shards1 = fully_replicated_shards_only(Shards0, ShardSyncHistory), { - fully_replicated_shards_only(Shards, ShardSyncHistory), - calculate_drop_seqs(substitute_splits(Shards, PeerCheckpoints), ShardSyncHistory) + Shards1, + calculate_drop_seqs(substitute_splits(Shards1, UuidMap, PeerCheckpoints), ShardSyncHistory) }. --spec calculate_drop_seqs(peer_checkpoints(), shard_sync_history()) -> peer_checkpoints(). +-spec calculate_drop_seqs(peer_checkpoints(), shard_sync_history()) -> + peer_checkpoints(). calculate_drop_seqs(PeerCheckpoints0, ShardSyncHistory) -> ShardSyncCheckpoints = latest_shard_sync_checkpoints(ShardSyncHistory), PeerCheckpoints1 = maps:merge_with(fun merge_peers/3, PeerCheckpoints0, ShardSyncCheckpoints), @@ -153,6 +160,40 @@ crossref(PeerCheckpoints0, ShardSyncHistory) -> crossref(PeerCheckpoints1, ShardSyncHistory) end. +-spec get_uuids(Shards :: [#shard{}]) -> uuid_map(). +get_uuids(Shards) -> + Workers = fabric_util:submit_jobs( + Shards, fabric_rpc, get_uuid, [] + ), + Acc0 = {#{}, length(Workers) - 1}, + RexiMon = fabric_util:create_monitors(Workers), + try + rexi_utils:recv( + Workers, + #shard.ref, + fun handle_get_uuid_reply/3, + Acc0, + fabric_util:request_timeout(), + infinity + ) + after + rexi_monitor:stop(RexiMon), + fabric_streams:cleanup(Workers) + end. + +handle_get_uuid_reply({rexi_DOWN, _, _, _}, _Worker, {Acc, Count}) -> + {ok, {Acc, Count - 1}}; +handle_get_uuid_reply({rexi_EXIT, _Reason}, _Worker, {Acc, Count}) -> + {ok, {Acc, Count - 1}}; +handle_get_uuid_reply(Uuid, Worker, {Acc0, Count}) when is_binary(Uuid) -> + Acc1 = Acc0#{{Worker#shard.range, Worker#shard.node} => Uuid}, + if + Count == 0 -> + {stop, Acc1}; + true -> + {ok, {Acc1, Count - 1}} + end. + -spec get_all_shard_sync_docs(Shards :: [#shard{}]) -> shard_sync_history(). get_all_shard_sync_docs(Shards) -> Workers = fabric_util:submit_jobs( @@ -265,16 +306,8 @@ parse_peer_checkpoint_docs_cb(_Else, Acc) -> {ok, Acc}. merge_peers(_Key, {Uuid1, Val1}, {Uuid2, Val2}) when - Uuid1 == undefined orelse Uuid2 == undefined, is_integer(Val1), is_integer(Val2) + is_binary(Uuid1), is_binary(Uuid2), is_integer(Val1), is_integer(Val2) -> - { - if - is_binary(Uuid1) -> Uuid1; - true -> Uuid2 - end, - min(Val1, Val2) - }; -merge_peers(_Key, {Uuid1, Val1}, {Uuid2, Val2}) when is_integer(Val1), is_integer(Val2) -> PrefixLen = min(byte_size(Uuid1), byte_size(Uuid2)), true = binary:longest_common_prefix([Uuid1, Uuid2]) == PrefixLen, {Uuid1, min(Val1, Val2)}. @@ -335,7 +368,7 @@ latest_shard_sync_checkpoints(ShardSyncHistory) -> %% A peer checkpoint might refer to a range that has been split since %% it last updated. Find these cases and split the peer checkpoints too. -substitute_splits(Shards, PeerCheckpoints) -> +substitute_splits(Shards, UuidMap, PeerCheckpoints) -> maps:fold( fun({[B1, E1], Node}, {Uuid, Seq}, Acc) -> MatchingRanges = [ @@ -344,12 +377,12 @@ substitute_splits(Shards, PeerCheckpoints) -> Node == S#shard.node, B2 >= B1 andalso E2 =< E1 ], - %% set uuid to undefined if ranges don't match as we don't know the uuids of the split shards + %% lookup uuid from map if substituted AsMap = maps:from_list([ {{R, Node}, { if [B1, E1] == R -> Uuid; - true -> undefined + true -> maps:get({R, Node}, UuidMap) end, Seq }} @@ -701,11 +734,12 @@ substitute_splits_test() -> Subrange2 = [6, 10], Node1 = '[email protected]', Shards = [#shard{range = Subrange1, node = Node1}, #shard{range = Subrange2, node = Node1}], + UuidMap = #{{Subrange1, Node1} => <<"uuid2">>, {Subrange2, Node1} => <<"uuid3">>}, PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}}, ?assertEqual( - #{{Subrange1, Node1} => {undefined, 12}, {Subrange2, Node1} => {undefined, 12}}, - substitute_splits(Shards, PeerCheckpoints) + #{{Subrange1, Node1} => {<<"uuid2">>, 12}, {Subrange2, Node1} => {<<"uuid3">>, 12}}, + substitute_splits(Shards, UuidMap, PeerCheckpoints) ). go_int_test_() -> @@ -714,19 +748,21 @@ go_int_test_() -> Subrange2 = [6, 10], Node1 = '[email protected]', Shards = [#shard{range = Subrange1, node = Node1}, #shard{range = Subrange2, node = Node1}], + UuidMap = #{{Subrange1, Node1} => <<"uuid2">>, {Subrange2, Node1} => <<"uuid3">>}, [ ?_assertEqual( {Shards, #{ - {Subrange1, Node1} => {undefined, 12}, {Subrange2, Node1} => {undefined, 12} + {Subrange1, Node1} => {<<"uuid2">>, 12}, {Subrange2, Node1} => {<<"uuid3">>, 12} }}, - go_int(Shards, #{{Range, Node1} => {<<"uuid1">>, 12}}, #{}) + go_int(Shards, UuidMap, #{{Range, Node1} => {<<"uuid1">>, 12}}, #{}) ), ?_assertEqual( {Shards, #{ - {Subrange1, Node1} => {<<"uuid2">>, 10}, {Subrange2, Node1} => {undefined, 12} + {Subrange1, Node1} => {<<"uuid2">>, 10}, {Subrange2, Node1} => {<<"uuid3">>, 12} }}, go_int( Shards, + UuidMap, #{{Range, Node1} => {<<"uuid1">>, 12}, {Subrange1, Node1} => {<<"uuid2">>, 10}}, #{} )
