This is an automated email from the ASF dual-hosted git repository. jan pushed a commit to branch auto-delete-3-plus-shard-move in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit da49f860f015579561bde7cb53589d2358f7c97f Author: Robert Newson <rnew...@apache.org> AuthorDate: Tue Jun 17 22:34:57 2025 +0100 Bug fixes --- src/fabric/src/fabric_drop_seq.erl | 192 ++++++++++++++++++++---------- test/elixir/test/drop_seq_statem_test.exs | 2 +- 2 files changed, 130 insertions(+), 64 deletions(-) diff --git a/src/fabric/src/fabric_drop_seq.erl b/src/fabric/src/fabric_drop_seq.erl index ce2b942d9..d2cb21edf 100644 --- a/src/fabric/src/fabric_drop_seq.erl +++ b/src/fabric/src/fabric_drop_seq.erl @@ -45,7 +45,8 @@ go(DbName) -> peer_checkpoints := PeerCheckpoints, shard_sync_history := ShardSyncHistory }} -> - {Shards1, DropSeqs} = go_int( + Shards1 = fully_replicated_shards_only(Shards0, ShardSyncHistory), + DropSeqs = calculate_drop_seqs( Shards0, UuidMap, PeerCheckpoints, ShardSyncHistory ), Workers = lists:filtermap( @@ -102,21 +103,13 @@ go(DbName) -> end end. -go_int( - Shards0, UuidMap, PeerCheckpoints0, ShardSyncHistory -) -> - Shards1 = fully_replicated_shards_only(Shards0, ShardSyncHistory), - PeerCheckpoints1 = crossref(PeerCheckpoints0, ShardSyncHistory), - PeerCheckpoints2 = substitute_splits(Shards1, UuidMap, PeerCheckpoints1), - DropSeqs = calculate_drop_seqs(PeerCheckpoints2, ShardSyncHistory), - {Shards1, DropSeqs}. - --spec calculate_drop_seqs(peer_checkpoints(), shard_sync_history()) -> +-spec calculate_drop_seqs([#shard{}], uuid_map(), peer_checkpoints(), shard_sync_history()) -> peer_checkpoints(). -calculate_drop_seqs(PeerCheckpoints0, ShardSyncHistory) -> +calculate_drop_seqs(Shards, UuidMap, PeerCheckpoints0, ShardSyncHistory) -> + PeerCheckpoints1 = substitute_splits(Shards, UuidMap, PeerCheckpoints0), + PeerCheckpoints2 = crossref(PeerCheckpoints1, ShardSyncHistory), ShardSyncCheckpoints = latest_shard_sync_checkpoints(ShardSyncHistory), - PeerCheckpoints1 = maps:merge_with(fun merge_peers/3, PeerCheckpoints0, ShardSyncCheckpoints), - crossref(PeerCheckpoints1, ShardSyncHistory). + maps:merge_with(fun merge_peers/3, PeerCheckpoints2, ShardSyncCheckpoints). handle_set_drop_seq_reply(ok, Worker, {Results0, Waiting}) -> DropSeq = proplists:get_value(drop_seq, Worker#shard.opts), @@ -144,7 +137,7 @@ crossref(PeerCheckpoints0, ShardSyncHistory) -> PeerCheckpoints1 = maps:fold( fun({Range, Node}, {Uuid, Seq}, Acc1) -> Others = maps:filter( - fun({R, _S, T}, _History) -> R == Range andalso T /= Node end, ShardSyncHistory + fun({R, S, _T}, _History) -> R == Range andalso S == Node end, ShardSyncHistory ), if Seq == 0 -> @@ -193,7 +186,7 @@ crossref(PeerCheckpoints0, ShardSyncHistory) -> crossref(PeerCheckpoints1, ShardSyncHistory) end. -%% return only the shards that have synced with every other replica +%% return only the shards that have synced to by every other replica fully_replicated_shards_only(Shards, ShardSyncHistory) -> lists:filter( fun(#shard{range = Range, node = Node}) -> @@ -376,9 +369,9 @@ decode_seq(OpaqueSeq) -> latest_shard_sync_checkpoints(ShardSyncHistory) -> maps:fold( - fun({R, _SN, TN}, History, Acc) -> - {_SU, _SS, TU, TS} = hd(History), - maps:merge_with(fun merge_peers/3, #{{R, TN} => {TU, TS}}, Acc) + fun({R, SN, _TN}, History, Acc) -> + {SU, SS, _TU, _TS} = hd(History), + maps:merge_with(fun merge_peers/3, #{{R, SN} => {SU, SS}}, Acc) end, #{}, ShardSyncHistory @@ -617,14 +610,44 @@ cleanup_peer_checkpoints_cb(_Else, Acc) -> empty_sync_history_means_no_change_test() -> Range = [0, 10], Node1 = 'node1@127.0.0.1', + Shards = [#shard{range = Range, node = Node1}], + UuidMap = #{}, PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}}, ShardSyncHistory = #{}, - ?assertEqual(PeerCheckpoints, calculate_drop_seqs(PeerCheckpoints, ShardSyncHistory)). + ?assertEqual( + PeerCheckpoints, + calculate_drop_seqs( + Shards, + UuidMap, + PeerCheckpoints, + ShardSyncHistory + ) + ). + +no_peer_checkpoints_mean_latest_shard_checkpoint_wins_test() -> + Range = [0, 10], + Node1 = 'node1@127.0.0.1', + Node2 = 'node2@127.0.0.1', + Shards = [#shard{range = Range, node = Node1}, #shard{range = Range, node = Node2}], + UuidMap = #{}, + PeerCheckpoints = #{}, + ShardSyncHistory = #{{Range, Node1, Node2} => [{<<"uuid1">>, 12, <<"uuid2">>, 5}]}, + ?assertEqual( + #{{Range, Node1} => {<<"uuid1">>, 12}}, + calculate_drop_seqs( + Shards, + UuidMap, + PeerCheckpoints, + ShardSyncHistory + ) + ). matching_sync_history_expands_result_test() -> Range = [0, 10], Node1 = 'node1@127.0.0.1', Node2 = 'node2@127.0.0.1', + Shards = [#shard{range = Range, node = Node1}, #shard{range = Range, node = Node2}], + UuidMap = #{}, PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}}, ShardSyncHistory = #{{Range, Node1, Node2} => [{<<"uuid1">>, 12, <<"uuid2">>, 5}]}, ?assertEqual( @@ -632,7 +655,12 @@ matching_sync_history_expands_result_test() -> {Range, Node1} => {<<"uuid1">>, 12}, {Range, Node2} => {<<"uuid2">>, 5} }, - calculate_drop_seqs(PeerCheckpoints, ShardSyncHistory) + calculate_drop_seqs( + Shards, + UuidMap, + PeerCheckpoints, + ShardSyncHistory + ) ). transitive_sync_history_expands_result_test() -> @@ -640,24 +668,37 @@ transitive_sync_history_expands_result_test() -> Node1 = 'node1@127.0.0.1', Node2 = 'node2@127.0.0.1', Node3 = 'node3@127.0.0.1', + Shards = [ + #shard{range = Range, node = Node1}, + #shard{range = Range, node = Node2}, + #shard{range = Range, node = Node3} + ], + UuidMap = #{}, PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}}, ShardSyncHistory = #{ - {Range, Node1, Node2} => [{<<"uuid1">>, 12, <<"uuid2">>, 5}], - {Range, Node2, Node3} => [{<<"uuid2">>, 11, <<"uuid3">>, 11}] + {Range, Node1, Node2} => [{<<"uuid1">>, 12, <<"uuid2">>, 11}], + {Range, Node2, Node3} => [{<<"uuid2">>, 11, <<"uuid3">>, 10}] }, ?assertEqual( #{ {Range, Node1} => {<<"uuid1">>, 12}, - {Range, Node2} => {<<"uuid2">>, 5}, - {Range, Node3} => {<<"uuid3">>, 11} + {Range, Node2} => {<<"uuid2">>, 11}, + {Range, Node3} => {<<"uuid3">>, 10} }, - calculate_drop_seqs(PeerCheckpoints, ShardSyncHistory) + calculate_drop_seqs( + Shards, + UuidMap, + PeerCheckpoints, + ShardSyncHistory + ) ). shard_sync_history_caps_peer_checkpoint_test() -> Range = [0, 10], Node1 = 'node1@127.0.0.1', Node2 = 'node2@127.0.0.1', + Shards = [#shard{range = Range, node = Node1}, #shard{range = Range, node = Node2}], + UuidMap = #{}, PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}}, ShardSyncHistory = #{{Range, Node1, Node2} => [{<<"uuid1">>, 10, <<"uuid2">>, 5}]}, ?assertEqual( @@ -665,7 +706,12 @@ shard_sync_history_caps_peer_checkpoint_test() -> {Range, Node1} => {<<"uuid1">>, 10}, {Range, Node2} => {<<"uuid2">>, 5} }, - calculate_drop_seqs(PeerCheckpoints, ShardSyncHistory) + calculate_drop_seqs( + Shards, + UuidMap, + PeerCheckpoints, + ShardSyncHistory + ) ). multiple_range_test() -> @@ -673,6 +719,13 @@ multiple_range_test() -> Range2 = [11, 20], Node1 = 'node1@127.0.0.1', Node2 = 'node2@127.0.0.1', + Shards = [ + #shard{range = Range1, node = Node1}, + #shard{range = Range1, node = Node2}, + #shard{range = Range2, node = Node1}, + #shard{range = Range2, node = Node2} + ], + UuidMap = #{}, PeerCheckpoints = #{{Range1, Node1} => {<<"r1n1">>, 12}, {Range2, Node2} => {<<"r2n2">>, 20}}, ShardSyncHistory = #{ {Range1, Node1, Node2} => [{<<"r1n1">>, 10, <<"r1n2">>, 5}], @@ -685,13 +738,20 @@ multiple_range_test() -> {Range2, Node2} => {<<"r2n2">>, 19}, {Range2, Node1} => {<<"r2n1">>, 17} }, - calculate_drop_seqs(PeerCheckpoints, ShardSyncHistory) + calculate_drop_seqs( + Shards, + UuidMap, + PeerCheckpoints, + ShardSyncHistory + ) ). search_history_for_latest_safe_crossover_test() -> Range = [0, 10], Node1 = 'node1@127.0.0.1', Node2 = 'node2@127.0.0.1', + Shards = [#shard{range = Range, node = Node1}, #shard{range = Range, node = Node2}], + UuidMap = #{}, PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 50}}, ShardSyncHistory = #{ {Range, Node1, Node2} => [ @@ -706,7 +766,7 @@ search_history_for_latest_safe_crossover_test() -> {Range, Node1} => {<<"uuid1">>, 50}, {Range, Node2} => {<<"uuid2">>, 51} }, - calculate_drop_seqs(PeerCheckpoints, ShardSyncHistory) + calculate_drop_seqs(Shards, UuidMap, PeerCheckpoints, ShardSyncHistory) ). fully_replicated_shards_only_test_() -> @@ -773,6 +833,15 @@ substitute_splits_test_() -> PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}}, [ + %% preserve peer checkpoint if no subs + ?_assertEqual( + #{{[0, 5], Node1} => {<<"uuid2">>, 12}, {[6, 10], Node1} => {<<"uuid3">>, 12}}, + substitute_splits( + [#shard{range = [0, 5], node = Node1}, #shard{range = [6, 10], node = Node1}], + UuidMap, + #{{[0, 5], Node1} => {<<"uuid2">>, 12}, {[6, 10], Node1} => {<<"uuid3">>, 12}} + ) + ), ?_assertEqual( #{{[0, 5], Node1} => {<<"uuid2">>, 12}, {[6, 10], Node1} => {<<"uuid3">>, 12}}, substitute_splits( @@ -831,28 +900,28 @@ crossref_test_() -> ?_assertEqual( #{ {Range, Node1} => {<<"n1">>, 5}, - {Range, Node2} => {<<"n2x">>, 4}, - {Range, Node3} => {<<"n3x">>, 3} + {Range, Node2} => {<<"n2">>, 4}, + {Range, Node3} => {<<"n3">>, 3} }, crossref( #{{Range, Node1} => {<<"n1">>, 5}}, #{ {Range, Node1, Node2} => [ - {<<"n1x">>, 10, <<"n2x">>, 9}, - {<<"n1x">>, 5, <<"n2x">>, 4}, - {<<"n1x">>, 2, <<"n2x">>, 1} + {<<"n1">>, 10, <<"n2">>, 9}, + {<<"n1">>, 5, <<"n2">>, 4}, + {<<"n1">>, 2, <<"n2">>, 1} ], - {Range, Node1, Node3} => [ - {<<"n1x">>, 9, <<"n3x">>, 8}, - {<<"n1x">>, 4, <<"n3x">>, 3}, - {<<"n1x">>, 3, <<"n3x">>, 2} + {Range, Node2, Node3} => [ + {<<"n2">>, 9, <<"n3">>, 8}, + {<<"n2">>, 4, <<"n3">>, 3}, + {<<"n2">>, 3, <<"n3">>, 2} ] } ) ) ]. -go_int_test_() -> +calculate_drop_seqs_test_() -> Range = [0, 10], Subrange1 = [0, 5], Subrange2 = [6, 10], @@ -864,16 +933,16 @@ go_int_test_() -> }, [ ?_assertEqual( - {Shards, #{ + #{ {Subrange1, Node1} => {<<"uuid2">>, 12}, {Subrange2, Node1} => {<<"uuid3">>, 12} - }}, - go_int(Shards, UuidMap, #{{Range, Node1} => {<<"uuid1">>, 12}}, #{}) + }, + calculate_drop_seqs(Shards, UuidMap, #{{Range, Node1} => {<<"uuid1">>, 12}}, #{}) ), ?_assertEqual( - {Shards, #{ + #{ {Subrange1, Node1} => {<<"uuid2">>, 10}, {Subrange2, Node1} => {<<"uuid3">>, 12} - }}, - go_int( + }, + calculate_drop_seqs( Shards, UuidMap, #{{Range, Node1} => {<<"uuid1">>, 12}, {Subrange1, Node1} => {<<"uuid2">>, 10}}, @@ -882,7 +951,7 @@ go_int_test_() -> ) ]. -go_int2_test_() -> +calculate_drop_seqs_split_test_() -> Range = [0, 10], Subrange1 = [0, 5], Subrange2 = [6, 10], @@ -908,20 +977,20 @@ go_int2_test_() -> {<<"s1n1">>, 50, <<"s1n2">>, 51}, {<<"s1n1">>, 12, <<"s1n2">>, 11} ], - {Subrange2, Node1, Node2} => [ - {<<"s2n1">>, 100, <<"s2n2">>, 99}, - {<<"s2n1">>, 75, <<"s2n2">>, 76}, - {<<"s2n1">>, 50, <<"s2n2">>, 51}, - {<<"s2n1">>, 12, <<"s2n2">>, 11} - ], {Subrange1, Node2, Node1} => [ - {<<"s1n2">>, 100, <<"s1n1">>, 99}, + {<<"s1n2">>, 101, <<"s1n1">>, 99}, {<<"s1n2">>, 75, <<"s1n1">>, 76}, {<<"s1n2">>, 50, <<"s1n1">>, 51}, {<<"s1n2">>, 12, <<"s1n1">>, 11} ], + {Subrange2, Node1, Node2} => [ + {<<"s2n1">>, 102, <<"s2n2">>, 99}, + {<<"s2n1">>, 75, <<"s2n2">>, 76}, + {<<"s2n1">>, 50, <<"s2n2">>, 51}, + {<<"s2n1">>, 12, <<"s2n2">>, 11} + ], {Subrange2, Node2, Node1} => [ - {<<"s2n2">>, 100, <<"s2n1">>, 99}, + {<<"s2n2">>, 103, <<"s2n1">>, 99}, {<<"s2n2">>, 75, <<"s2n1">>, 76}, {<<"s2n2">>, 50, <<"s2n1">>, 51}, {<<"s2n2">>, 12, <<"s2n1">>, 11} @@ -931,18 +1000,15 @@ go_int2_test_() -> ?_assertEqual( #{ {Subrange1, Node1} => {<<"s1n1">>, 12}, - {Subrange2, Node1} => {<<"s2n1">>, 12}, {Subrange1, Node2} => {<<"s1n2">>, 11}, + {Subrange2, Node1} => {<<"s2n1">>, 12}, {Subrange2, Node2} => {<<"s2n2">>, 11} }, - element( - 2, - go_int( - Shards, - UuidMap, - #{{Range, Node1} => {<<"ignored">>, 12}}, - ShardSyncHistory - ) + calculate_drop_seqs( + Shards, + UuidMap, + #{{Range, Node1} => {<<"ignored">>, 12}}, + ShardSyncHistory ) ) ]. diff --git a/test/elixir/test/drop_seq_statem_test.exs b/test/elixir/test/drop_seq_statem_test.exs index e49d55994..aab97ac7b 100644 --- a/test/elixir/test/drop_seq_statem_test.exs +++ b/test/elixir/test/drop_seq_statem_test.exs @@ -443,7 +443,7 @@ defmodule DropSeqStateM do "sync_shards failed #{resp.status_code} #{inspect(resp.body)}" # mem3_rep configured for 100ms frequency - :timer.sleep(1000) + :timer.sleep(3000) end def precondition(s, {:call, _, :update_document, [_db_name, doc_id]}) do