This is an automated email from the ASF dual-hosted git repository. jan pushed a commit to branch auto-delete-3-plus-shard-move in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 9d90b7f0c6bc5d72ad74991a5d324a7c09fe2931 Author: Robert Newson <[email protected]> AuthorDate: Thu Jun 12 15:06:02 2025 +0100 WIP --- src/fabric/src/fabric.erl | 2 +- src/fabric/src/fabric_drop_seq.erl | 219 ++++++++++++++++-------------- test/elixir/test/drop_seq_statem_test.exs | 167 +++++++++++++++-------- 3 files changed, 229 insertions(+), 159 deletions(-) diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl index 6cc182945..1136d647b 100644 --- a/src/fabric/src/fabric.erl +++ b/src/fabric/src/fabric.erl @@ -121,7 +121,7 @@ get_partition_info(DbName, Partition) -> fabric_db_partition_info:go(dbname(DbName), Partition). -spec update_drop_seq(dbname()) -> - {ok, [{node(), binary(), non_neg_integer()}]}. + {ok, [{node(), binary(), non_neg_integer()}]} | {error, term()}. update_drop_seq(DbName) -> fabric_drop_seq:go(dbname(DbName)). diff --git a/src/fabric/src/fabric_drop_seq.erl b/src/fabric/src/fabric_drop_seq.erl index c22cd909b..d6112a957 100644 --- a/src/fabric/src/fabric_drop_seq.erl +++ b/src/fabric/src/fabric_drop_seq.erl @@ -37,67 +37,77 @@ go(DbName) -> Shards0 = mem3:shards(DbName), - #{ - uuid_map := UuidMap, - peer_checkpoints := PeerCheckpoints, - shard_sync_history := ShardSyncHistory - } = gather_drop_seq_info( - Shards0 - ), - {Shards1, DropSeqs} = go_int( - Shards0, UuidMap, PeerCheckpoints, ShardSyncHistory - ), - Workers = lists:filtermap( - fun(Shard) -> - #shard{range = Range, node = Node, name = ShardName} = Shard, - case maps:find({Range, Node}, DropSeqs) of - {ok, {_UuidPrefix, 0}} -> - false; - {ok, {UuidPrefix, DropSeq}} -> - Ref = rexi:cast( - Node, - {fabric_rpc, set_drop_seq, [ShardName, UuidPrefix, DropSeq, [?ADMIN_CTX]]} - ), - {true, Shard#shard{ref = Ref, opts = [{drop_seq, DropSeq}]}}; - error -> - false - end - end, - Shards1 - ), - if - Workers == [] -> - %% nothing to do - {ok, #{}}; - true -> - RexiMon = fabric_util:create_monitors(Shards1), - Acc0 = {#{}, length(Workers) - 1}, - try - case fabric_util:recv(Workers, #shard.ref, fun handle_set_drop_seq_reply/3, Acc0) of - {ok, Results} -> - {ok, Results}; - {timeout, {WorkersDict, _}} -> - DefunctWorkers = fabric_util:remove_done_workers( - WorkersDict, - nil - ), - fabric_util:log_timeout( - DefunctWorkers, - "set_drop_seq" - ), - {error, timeout}; - {error, Reason} -> - {error, Reason} - end - after - rexi_monitor:stop(RexiMon) + case gather_drop_seq_info(Shards0) of + {error, Reason} -> + {error, Reason}; + {ok, #{ + uuid_map := UuidMap, + peer_checkpoints := PeerCheckpoints, + shard_sync_history := ShardSyncHistory + }} -> + {Shards1, DropSeqs} = go_int( + Shards0, UuidMap, PeerCheckpoints, ShardSyncHistory + ), + Workers = lists:filtermap( + fun(Shard) -> + #shard{range = Range, node = Node, name = ShardName} = Shard, + case maps:find({Range, Node}, DropSeqs) of + {ok, {_UuidPrefix, 0}} -> + false; + {ok, {UuidPrefix, DropSeq}} -> + Ref = rexi:cast( + Node, + {fabric_rpc, set_drop_seq, [ + ShardName, UuidPrefix, DropSeq, [?ADMIN_CTX] + ]} + ), + {true, Shard#shard{ref = Ref, opts = [{drop_seq, DropSeq}]}}; + error -> + false + end + end, + Shards1 + ), + if + Workers == [] -> + %% nothing to do + {ok, #{}}; + true -> + RexiMon = fabric_util:create_monitors(Shards1), + Acc0 = {#{}, length(Workers) - 1}, + try + case + fabric_util:recv( + Workers, #shard.ref, fun handle_set_drop_seq_reply/3, Acc0 + ) + of + {ok, Results} -> + {ok, Results}; + {timeout, {WorkersDict, _}} -> + DefunctWorkers = fabric_util:remove_done_workers( + WorkersDict, + nil + ), + fabric_util:log_timeout( + DefunctWorkers, + "set_drop_seq" + ), + {error, timeout}; + {error, Reason} -> + {error, Reason} + end + after + rexi_monitor:stop(RexiMon) + end end end. -go_int(Shards0, UuidFetcher, PeerCheckpoints0, ShardSyncHistory) -> +go_int( + Shards0, UuidMap, PeerCheckpoints0, ShardSyncHistory +) -> Shards1 = fully_replicated_shards_only(Shards0, ShardSyncHistory), PeerCheckpoints1 = crossref(PeerCheckpoints0, ShardSyncHistory), - PeerCheckpoints2 = substitute_splits(Shards1, UuidFetcher, PeerCheckpoints1), + PeerCheckpoints2 = substitute_splits(Shards1, UuidMap, PeerCheckpoints1), DropSeqs = calculate_drop_seqs(PeerCheckpoints2, ShardSyncHistory), {Shards1, DropSeqs}. @@ -136,25 +146,38 @@ crossref(PeerCheckpoints0, ShardSyncHistory) -> Others = maps:filter( fun({R, _S, T}, _History) -> R == Range andalso T /= Node end, ShardSyncHistory ), - maps:fold( - fun({R, _S, T}, History, Acc2) -> - case - lists:search( - fun({SU, SS, _TU, _TS}) -> - uuids_match([Uuid, SU]) andalso SS =< Seq - end, - History - ) - of - {value, {_SU, _SS, TU, TS}} -> - maps:merge_with(fun merge_peers/3, #{{R, T} => {TU, TS}}, Acc2); - false -> - Acc2 - end - end, - Acc1, - Others - ) + if + Seq == 0 -> + %% propogate any 0 checkpoint as they would not be + %% matched in shard sync history. + maps:fold( + fun({R, _S, T}, _History, Acc2) -> + maps:merge_with(fun merge_peers/3, #{{R, T} => {<<>>, 0}}, Acc2) + end, + Acc1, + Others + ); + true -> + maps:fold( + fun({R, _S, T}, History, Acc2) -> + case + lists:search( + fun({SU, SS, _TU, _TS}) -> + uuids_match([Uuid, SU]) andalso SS =< Seq + end, + History + ) + of + {value, {_SU, _SS, TU, TS}} -> + maps:merge_with(fun merge_peers/3, #{{R, T} => {TU, TS}}, Acc2); + false -> + Acc2 + end + end, + Acc1, + Others + ) + end end, PeerCheckpoints0, PeerCheckpoints0 @@ -165,25 +188,7 @@ crossref(PeerCheckpoints0, ShardSyncHistory) -> %% crossreferences may be possible. if PeerCheckpoints0 == PeerCheckpoints1 -> - %% insert {<<>>, 0} for any missing crossref so that shard sync - %% history is subordinate. - maps:fold( - fun({Range, Node}, {_Uuid, _Seq}, Acc1) -> - Others = maps:filter( - fun({R, _S, T}, _History) -> R == Range andalso T /= Node end, - ShardSyncHistory - ), - maps:fold( - fun({R, _S, T}, _History, Acc3) -> - maps:merge(#{{R, T} => {<<>>, 0}}, Acc3) - end, - Acc1, - Others - ) - end, - PeerCheckpoints1, - PeerCheckpoints1 - ); + PeerCheckpoints1; true -> crossref(PeerCheckpoints1, ShardSyncHistory) end. @@ -202,7 +207,8 @@ fully_replicated_shards_only(Shards, ShardSyncHistory) -> Shards ). --spec gather_drop_seq_info(Shards :: [#shard{}]) -> {peer_checkpoints(), shard_sync_history()}. +-spec gather_drop_seq_info(Shards :: [#shard{}]) -> + {ok, peer_checkpoints(), shard_sync_history()} | {error, term()}. gather_drop_seq_info([#shard{} | _] = Shards) -> Workers = fabric_util:submit_jobs( Shards, ?MODULE, gather_drop_seq_info_rpc, [] @@ -221,7 +227,7 @@ gather_drop_seq_info([#shard{} | _] = Shards) -> ) of {ok, Result} -> - Result; + {ok, Result}; {timeout, _State} -> {error, timeout}; {error, Reason} -> @@ -237,13 +243,16 @@ gather_drop_seq_info_rpc(DbName) -> {ok, Db} -> try Uuid = couch_db:get_uuid(Db), - Acc0 = {#{}, #{}}, + Seq = couch_db:get_committed_update_seq(Db), + Range = mem3:range(DbName), + Acc0 = {#{{Range, node()} => {Uuid, Seq}}, #{}}, {ok, {PeerCheckpoints, ShardSyncHistory}} = couch_db:fold_local_docs( Db, fun gather_drop_seq_info_fun/2, Acc0, [] ), rexi:reply( {ok, #{ uuid => Uuid, + seq => Seq, peer_checkpoints => PeerCheckpoints, shard_sync_history => ShardSyncHistory }} @@ -353,7 +362,7 @@ decode_seq(OpaqueSeq) -> is_integer(Seq), S >= 0, E > S, - Seq > 0, + Seq >= 0, is_binary(Uuid), is_atom(Node) -> @@ -379,12 +388,12 @@ latest_shard_sync_checkpoints(ShardSyncHistory) -> -spec substitute_splits([#shard{}], uuid_map(), peer_checkpoints()) -> peer_checkpoints(). substitute_splits(Shards, UuidMap, PeerCheckpoints) -> maps:fold( - fun({[B1, E1], Node}, {Uuid, Seq}, Acc) -> + fun({[PS, PE], Node}, {Uuid, Seq}, Acc) -> ShardsInRange = [ S - || #shard{range = [B2, E2]} = S <- Shards, + || #shard{range = [SS, SE]} = S <- Shards, Node == S#shard.node, - B2 >= B1 andalso E2 =< E1 + SS >= PS andalso SE =< PE ], %% lookup uuid from map if substituted AsMap = maps:from_list( @@ -392,7 +401,7 @@ substitute_splits(Shards, UuidMap, PeerCheckpoints) -> fun(#shard{} = Shard) -> Key = {Shard#shard.range, Shard#shard.node}, if - [B1, E1] == Shard#shard.range -> + [PS, PE] == Shard#shard.range -> {true, {Key, {Uuid, Seq}}}; true -> case maps:find(Key, UuidMap) of @@ -711,6 +720,12 @@ fully_replicated_shards_only_test_() -> #shard{node = node2, range = Range2} ], [ + %% n=1 edge case + ?_assertEqual( + [hd(Shards)], + fully_replicated_shards_only([hd(Shards)], #{}) + ), + %% empty history means no fully replicated shards ?_assertEqual([], fully_replicated_shards_only(Shards, #{})), %% some but not all peers diff --git a/test/elixir/test/drop_seq_statem_test.exs b/test/elixir/test/drop_seq_statem_test.exs index 2695b8a1f..f4441c91e 100644 --- a/test/elixir/test/drop_seq_statem_test.exs +++ b/test/elixir/test/drop_seq_statem_test.exs @@ -3,9 +3,6 @@ defmodule DropSeqStateM do use PropCheck.StateM use CouchTestCase - # alias Couch.Test.Utils - # import Utils - @moduletag capture_log: true # expected to pass in all three cluster scenarios @@ -20,7 +17,7 @@ defmodule DropSeqStateM do forall cmds <- commands(__MODULE__) do trap_exit do db_name = random_db_name() - n = Enum.random(1..3) + n = Enum.random(2..3) q = Enum.random(1..10) {:ok, _} = create_db(db_name, query: %{n: n, q: q}) r = run_commands(__MODULE__, cmds, [{:dbname, db_name}]) @@ -28,13 +25,13 @@ defmodule DropSeqStateM do delete_db(db_name) (result == :ok) - |> when_fail(when_fail_fn(n, q, r, cmds)) + |> when_fail(when_fail_fn(db_name, n, q, r, cmds)) end end end - def when_fail_fn(n, q, r, cmds) do - IO.puts("\nn: #{n}, q: #{q}") + def when_fail_fn(db_name, n, q, r, cmds) do + IO.puts("\ndb_name: #{db_name}, n: #{n}, q: #{q}") print_report(r, cmds) end @@ -43,10 +40,16 @@ defmodule DropSeqStateM do deleted_docs: [], current_seq: 0, peer_checkpoint_seq: nil, + index_seq: nil, drop_seq: nil, drop_count: 0, - changed: false, - stale: false + check_actual_state: false + end + + defmodule ActualState do + defstruct docs: [], + deleted_docs: [], + drop_count: 0 end def initial_state, do: %State{} @@ -56,28 +59,49 @@ defmodule DropSeqStateM do def doc_id, do: oneof(@docids) + def doc_id(%State{docs: doc_ids}), do: elements(doc_ids) + def index_type do oneof([:mrview, :nouveau]) end def command(s) do case s do - %State{stale: true} -> - {:call, __MODULE__, :update_indexes, [{:var, :dbname}]} - - %State{changed: true} -> - {:call, __MODULE__, :changes, [{:var, :dbname}]} + %State{check_actual_state: true} -> + {:call, __MODULE__, :check_actual_state, [{:var, :dbname}]} %State{} -> - frequency([ + base_cmds = [ {10, {:call, __MODULE__, :update_document, [{:var, :dbname}, doc_id()]}}, - {10, {:call, __MODULE__, :delete_document, [{:var, :dbname}, doc_id()]}}, {10, {:call, __MODULE__, :update_peer_checkpoint, [{:var, :dbname}]}}, - {10, {:call, __MODULE__, :update_drop_seq, [{:var, :dbname}]}}, {10, {:call, __MODULE__, :compact_db, [{:var, :dbname}]}}, {5, {:call, __MODULE__, :split_shard, [{:var, :dbname}]}}, - {1, {:call, __MODULE__, :create_index, [{:var, :dbname}, index_type()]}} - ]) + {1, {:call, __MODULE__, :create_index, [{:var, :dbname}, index_type()]}}, + {5, {:call, __MODULE__, :update_indexes, [{:var, :dbname}]}} + ] + + cond do + s.docs == [] and s.deleted_docs == [] -> + frequency(base_cmds) + + s.docs != [] -> + frequency( + base_cmds ++ + [ + {10, + {:call, __MODULE__, :delete_document, [{:var, :dbname}, doc_id(s)]}}, + {10, {:call, __MODULE__, :update_drop_seq, [{:var, :dbname}]}} + ] + ) + + true -> + frequency( + base_cmds ++ + [ + {10, {:call, __MODULE__, :update_drop_seq, [{:var, :dbname}]}} + ] + ) + end end end @@ -108,7 +132,7 @@ defmodule DropSeqStateM do "Couch.put failed #{resp.status_code} #{inspect(resp.body)}" end - sync_shards(db_name) + wait_for_internal_replication(db_name) end def delete_document(db_name, doc_id) do @@ -124,7 +148,7 @@ defmodule DropSeqStateM do :ok end - sync_shards(db_name) + wait_for_internal_replication(db_name) end def update_peer_checkpoint(db_name) do @@ -145,7 +169,7 @@ defmodule DropSeqStateM do assert resp.status_code == 201, "update_peer_checkpoint failed #{resp.status_code} #{inspect(resp.body)}" - sync_shards(db_name) + wait_for_internal_replication(db_name) seq_to_shards(update_seq) end @@ -165,15 +189,28 @@ defmodule DropSeqStateM do :timer.sleep(500) end - def changes(db_name) do + def check_actual_state(db_name) do + resp = Couch.get("/#{db_name}/") + assert resp.status_code == 200 + + # update_seq = String.to_integer(List.first(String.split(resp.body["update_seq"], "-"))) + drop_count = resp.body["drop_count"] + resp = Couch.get("/#{db_name}/_changes") assert resp.status_code == 200 - List.foldl(resp.body["results"], {[], []}, fn change, {doc_ids, del_doc_ids} -> - if change["deleted"] do - {doc_ids, Enum.sort([change["id"] | del_doc_ids])} - else - {Enum.sort([change["id"] | doc_ids]), del_doc_ids} + acc0 = %ActualState{drop_count: drop_count} + + List.foldl(resp.body["results"], acc0, fn change, acc1 -> + cond do + String.starts_with?(change["id"], "_design/") -> + acc1 + + change["deleted"] -> + %ActualState{acc1 | deleted_docs: Enum.sort([change["id"] | acc1.deleted_docs])} + + true -> + %ActualState{acc1 | docs: Enum.sort([change["id"] | acc1.docs])} end end) end @@ -260,7 +297,7 @@ defmodule DropSeqStateM do assert resp.status_code == 201, "create_index failed #{resp.status_code} #{inspect(resp.body)}" - sync_shards(db_name) + wait_for_internal_replication(db_name) ddoc_id end @@ -285,14 +322,14 @@ defmodule DropSeqStateM do end) end - def sync_shards(db_name) do + def wait_for_internal_replication(db_name) do resp = Couch.post("/#{db_name}/_sync_shards") assert resp.status_code == 202, "sync_shards failed #{resp.status_code} #{inspect(resp.body)}" # mem3_rep configured for 100ms frequency - :timer.sleep(200) + :timer.sleep(500) end def precondition(s, {:call, _, :update_document, [_db_name, doc_id]}) do @@ -303,6 +340,14 @@ defmodule DropSeqStateM do doc_exists(s, doc_id) end + def precondition(s, {:call, _, :update_drop_seq, [_db_name]}) do + s.docs != [] or s.deleted_docs != [] + end + + def precondition(s, {:call, _, :update_indexes, [_db_name]}) do + s.index_seq != nil + end + def precondition(_, _) do true end @@ -313,8 +358,7 @@ defmodule DropSeqStateM do | current_seq: s.current_seq + 1, docs: Enum.sort([doc_id | s.docs]), deleted_docs: List.keydelete(s.deleted_docs, doc_id, 0), - changed: true, - stale: true + check_actual_state: true } end @@ -324,27 +368,39 @@ defmodule DropSeqStateM do | current_seq: s.current_seq + 1, docs: List.delete(s.docs, doc_id), deleted_docs: Enum.sort([{doc_id, s.current_seq + 1} | s.deleted_docs]), - changed: true, - stale: true + check_actual_state: true } end def next_state(s, _v, {:call, _, :update_peer_checkpoint, [_db_name]}) do - %State{s | peer_checkpoint_seq: s.current_seq, changed: true} + %State{ + s + | peer_checkpoint_seq: s.current_seq, + check_actual_state: true + } end def next_state(s, _v, {:call, _, :update_drop_seq, [_db_name]}) do - # we'll drop all tombstones if _update_drop_seq is called when there - # are no peer checkpoint docs as the only peers are the shard syncs - # which update automatically - # n.b: indexes and their peer checkpoints will always be fresh as we - # force update_indexes after every doc update. drop_seq = - if s.peer_checkpoint_seq == nil, - do: s.current_seq, - else: s.peer_checkpoint_seq + cond do + s.peer_checkpoint_seq != nil and s.index_seq != nil -> + min(s.peer_checkpoint_seq, s.index_seq) - %State{s | drop_seq: drop_seq, changed: true} + s.index_seq != nil -> + s.index_seq + + s.peer_checkpoint_seq != nil -> + s.peer_checkpoint_seq + + true -> + s.current_seq + end + + %State{ + s + | drop_seq: drop_seq, + check_actual_state: true + } end def next_state(s, _v, {:call, _, :compact_db, [_db_name]}) do @@ -357,34 +413,33 @@ defmodule DropSeqStateM do s | deleted_docs: keep_docs, drop_count: s.drop_count + length(drop_docs), - changed: true + check_actual_state: true } end - def next_state(s, _v, {:call, _, :changes, [_db_name]}) do - %State{s | changed: false} + def next_state(s, _v, {:call, _, :check_actual_state, [_db_name]}) do + %State{s | check_actual_state: false} end def next_state(s, _v, {:call, _, :split_shard, [_db_name]}) do - %State{s | changed: true} + %State{s | check_actual_state: true} end - def next_state(s, v, {:call, _, :create_index, [_db_name, _index_type]}) do + def next_state(s, _v, {:call, _, :create_index, [_db_name, _index_type]}) do %State{ s | current_seq: s.current_seq + 1, - docs: Enum.sort([v | s.docs]), - changed: true, - stale: true + check_actual_state: true } end def next_state(s, _v, {:call, _, :update_indexes, [_db_name]}) do - %State{s | stale: false} + %State{s | index_seq: s.current_seq, check_actual_state: true} end - def postcondition(s, {:call, _, :changes, [_db_name]}, {doc_ids, del_doc_ids}) do - doc_ids == doc_ids(s) and del_doc_ids == deleted_doc_ids(s) + def postcondition(s, {:call, _, :check_actual_state, [_db_name]}, actual) do + doc_ids(s) == actual.docs and deleted_doc_ids(s) == actual.deleted_docs and + s.drop_count == actual.drop_count end def postcondition(_, _, _), do: true
