This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch ra in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 324a3a0bfa36563a2ea60cf83b47fe3c9f506b7a Author: Robert Newson <[email protected]> AuthorDate: Tue Jan 21 20:16:19 2025 +0000 fabric_doc_update ra blah --- src/chttpd/src/chttpd_app.erl | 14 +- src/couch/src/couch_write_queue.erl | 9 +- src/fabric/src/fabric_doc_update.erl | 721 +---------------------------------- 3 files changed, 29 insertions(+), 715 deletions(-) diff --git a/src/chttpd/src/chttpd_app.erl b/src/chttpd/src/chttpd_app.erl index 698ddbd34..8f1c2c78e 100644 --- a/src/chttpd/src/chttpd_app.erl +++ b/src/chttpd/src/chttpd_app.erl @@ -14,6 +14,9 @@ -behaviour(application). -export([start/2, stop/1]). +% temp location +-export([couch_write_queue_name/1]). + start(_Type, StartArgs) -> start_ra(), chttpd_sup:start_link(StartArgs). @@ -35,10 +38,7 @@ start_ra() -> [start_cluster(Nodes) || Nodes <- combinations(3, mem3:nodes())]. start_cluster(Nodes) -> - Name = list_to_atom( - "couch_write_queue_" ++ - lists:flatten(lists:join("_", [atom_to_list(N) || N <- lists:sort(Nodes)])) - ), + Name = couch_write_queue_name(Nodes), ra:start_cluster( default, Name, @@ -46,6 +46,12 @@ start_cluster(Nodes) -> [{Name, N} || N <- Nodes] ). +couch_write_queue_name(Nodes) -> + list_to_atom( + "couch_write_queue_" ++ + lists:flatten(lists:join("_", [atom_to_list(N) || N <- lists:sort(Nodes)])) + ). + %% https://rosettacode.org/wiki/Combinations#Erlang combinations(0, _Nodes) -> [[]]; diff --git a/src/couch/src/couch_write_queue.erl b/src/couch/src/couch_write_queue.erl index 957e1bc5e..a7ebdd470 100644 --- a/src/couch/src/couch_write_queue.erl +++ b/src/couch/src/couch_write_queue.erl @@ -26,16 +26,19 @@ init(_Conf) -> nil. -apply(_Meta, {update_docs, DbName, Docs} = Command, State) when is_binary(DbName), is_list(Docs) -> +apply(_Meta, {update_docs, DbName, Docs, Options} = Command, State) when + is_binary(DbName), is_list(Docs), is_list(Options) +-> {State, ok, [{aux, Command}]}. init_aux(_Name) -> nil. -handle_aux(_RaftState, cast, {update_docs, DbName, Docs}, AuxState, IntState) -> +handle_aux(_RaftState, cast, {update_docs, DbName, Docs, Options}, AuxState, IntState) -> {ok, Db} = couch_db:open(DbName, [{create_if_missing, true}, ?ADMIN_CTX]), try - Reply = couch_db:update_docs(Db, Docs, [], ?INTERACTIVE_EDIT), + Reply = couch_db:update_docs(Db, Docs, Options, ?INTERACTIVE_EDIT), + couch_log:notice("handle_aux ~s ~p ~p", [couch_db:name(Db), Docs, Reply]), {reply, Reply, AuxState, IntState} after couch_db:close(Db) diff --git a/src/fabric/src/fabric_doc_update.erl b/src/fabric/src/fabric_doc_update.erl index 77b424911..cb9aaf6b0 100644 --- a/src/fabric/src/fabric_doc_update.erl +++ b/src/fabric/src/fabric_doc_update.erl @@ -20,100 +20,18 @@ go(_, [], _) -> {ok, []}; go(DbName, AllDocs0, Opts) -> - AllDocs1 = before_doc_update(DbName, AllDocs0, Opts), - AllDocs = tag_docs(AllDocs1), + AllDocs = before_doc_update(DbName, AllDocs0, Opts), validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)), Options = lists:delete(all_or_nothing, Opts), - GroupedDocs = lists:map( - fun({#shard{name = Name, node = Node} = Shard, Docs}) -> - Docs1 = untag_docs(Docs), - Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs1, Options]}), - {Shard#shard{ref = Ref}, Docs} - end, - group_docs_by_shard(DbName, AllDocs) - ), - {Workers, _} = lists:unzip(GroupedDocs), - RexiMon = fabric_util:create_monitors(Workers), - W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))), - Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs, dict:new()}, + GroupedDocs = group_docs_by_shard(DbName, AllDocs), Timeout = fabric_util:request_timeout(), - try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of - {ok, {Health, Results}} when - Health =:= ok; Health =:= accepted; Health =:= error - -> - ensure_all_responses(Health, AllDocs, Results); - {timeout, Acc} -> - {_, _, W1, GroupedDocs1, DocReplDict} = Acc, - {DefunctWorkers, _} = lists:unzip(GroupedDocs1), - fabric_util:log_timeout(DefunctWorkers, "update_docs"), - {Health, _, Resp} = dict:fold( - fun force_reply/3, - {ok, W1, []}, - DocReplDict - ), - ensure_all_responses(Health, AllDocs, Resp); - Else -> - Else - after - rexi_monitor:stop(RexiMon) - end. - -handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker, Acc0) -> - {_, LenDocs, W, GroupedDocs, DocReplyDict} = Acc0, - NewGrpDocs = [X || {#shard{node = N}, _} = X <- GroupedDocs, N =/= NodeRef], - skip_message({length(NewGrpDocs), LenDocs, W, NewGrpDocs, DocReplyDict}); -handle_message({rexi_EXIT, _}, Worker, Acc0) -> - {WC, LenDocs, W, GrpDocs, DocReplyDict} = Acc0, - NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), - skip_message({WC - 1, LenDocs, W, NewGrpDocs, DocReplyDict}); -handle_message({error, all_dbs_active}, Worker, Acc0) -> - % treat it like rexi_EXIT, the hope at least one copy will return successfully - {WC, LenDocs, W, GrpDocs, DocReplyDict} = Acc0, - NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), - skip_message({WC - 1, LenDocs, W, NewGrpDocs, DocReplyDict}); -handle_message(internal_server_error, Worker, Acc0) -> - % happens when we fail to load validation functions in an RPC worker - {WC, LenDocs, W, GrpDocs, DocReplyDict} = Acc0, - NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), - skip_message({WC - 1, LenDocs, W, NewGrpDocs, DocReplyDict}); -handle_message(attachment_chunk_received, _Worker, Acc0) -> - {ok, Acc0}; -handle_message({ok, Replies}, Worker, Acc0) -> - {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0, - {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs), - DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), - case {WaitingCount, dict:size(DocReplyDict)} of - {1, _} -> - % last message has arrived, we need to conclude things - {Health, W, Reply} = dict:fold( - fun force_reply/3, - {ok, W, []}, - DocReplyDict - ), - {stop, {Health, Reply}}; - {_, DocCount} -> - % we've got at least one reply for each document, let's take a look - case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of - continue -> - {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}}; - {stop, W, FinalReplies} -> - {stop, {ok, FinalReplies}} - end; - _ -> - {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}} - end; -handle_message({missing_stub, Stub}, _, _) -> - throw({missing_stub, Stub}); -handle_message({not_found, no_db_file} = X, Worker, Acc0) -> - {_, _, _, GroupedDocs, _} = Acc0, - Docs = couch_util:get_value(Worker, GroupedDocs), - handle_message({ok, [X || _D <- Docs]}, Worker, Acc0); -handle_message({bad_request, Msg}, _, _) -> - throw({bad_request, Msg}); -handle_message({forbidden, Msg}, _, _) -> - throw({forbidden, Msg}); -handle_message({request_entity_too_large, Entity}, _, _) -> - throw({request_entity_too_large, Entity}). + RaFun = fun(Shards, Docs) -> + ShardDbName = (hd(Shards))#shard.name, + Nodes = [S#shard.node || S <- Shards], + Queue = chttpd_app:couch_write_queue_name(Nodes), + ra:process_command(Queue, {update_docs, ShardDbName, Docs, Options}, Timeout) + end, + maps:foreach(RaFun, GroupedDocs). before_doc_update(DbName, Docs, Opts) -> % Use the same pattern as in couch_db:validate_doc_update/3. If the document was already @@ -145,166 +63,11 @@ before_doc_update(DbName, Docs, Opts) -> Docs end. -tag_docs([]) -> - []; -tag_docs([#doc{meta = Meta} = Doc | Rest]) -> - [Doc#doc{meta = [{ref, make_ref()} | Meta]} | tag_docs(Rest)]. - -untag_docs([]) -> - []; -untag_docs([#doc{meta = Meta} = Doc | Rest]) -> - [Doc#doc{meta = lists:keydelete(ref, 1, Meta)} | untag_docs(Rest)]. - -force_reply(Doc, [], {_, W, Acc}) -> - {error, W, [{Doc, {error, internal_server_error}} | Acc]}; -force_reply(Doc, [FirstReply | _] = Replies, {Health, W, Acc}) -> - case update_quorum_met(W, Replies) of - {true, Reply} -> - % corner case new_edits:false and vdu: [noreply, forbidden, noreply] - case check_forbidden_msg(Replies) of - {forbidden, ForbiddenReply} -> - {Health, W, [{Doc, ForbiddenReply} | Acc]}; - false -> - {Health, W, [{Doc, Reply} | Acc]} - end; - false -> - case [Reply || {ok, Reply} <- Replies] of - [] -> - % check if all errors are identical, if so inherit health - case lists:all(fun(E) -> E =:= FirstReply end, Replies) of - true -> - CounterKey = [fabric, doc_update, errors], - couch_stats:increment_counter(CounterKey), - {Health, W, [{Doc, FirstReply} | Acc]}; - false -> - CounterKey = [fabric, doc_update, mismatched_errors], - couch_stats:increment_counter(CounterKey), - case check_forbidden_msg(Replies) of - {forbidden, ForbiddenReply} -> - {Health, W, [{Doc, ForbiddenReply} | Acc]}; - false -> - {error, W, [{Doc, FirstReply} | Acc]} - end - end; - [AcceptedRev | _] -> - CounterKey = [fabric, doc_update, write_quorum_errors], - couch_stats:increment_counter(CounterKey), - NewHealth = - case Health of - ok -> accepted; - _ -> Health - end, - {NewHealth, W, [{Doc, {accepted, AcceptedRev}} | Acc]} - end - end. - -maybe_reply(_, _, continue) -> - % we didn't meet quorum for all docs, so we're fast-forwarding the fold - continue; -maybe_reply(Doc, Replies, {stop, W, Acc}) -> - case update_quorum_met(W, Replies) of - {true, Reply} -> - {stop, W, [{Doc, Reply} | Acc]}; - false -> - continue - end. - -% this ensures that we got some response for all documents being updated -ensure_all_responses(Health, AllDocs, Resp) -> - Results = [ - R - || R <- couch_util:reorder_results( - AllDocs, - Resp, - {error, internal_server_error} - ), - R =/= noreply - ], - case lists:member({error, internal_server_error}, Results) of - true -> - {error, Results}; - false -> - {Health, Results} - end. - -% This is a corner case where -% 1) revision tree for the document are out of sync across nodes -% 2) update on one node extends the revision tree -% 3) VDU forbids the document -% 4) remaining nodes do not extend revision tree, so noreply is returned -% If at at least one node forbids the update, and all other replies -% are noreply, then we reject the update -check_forbidden_msg(Replies) -> - Pred = fun - ({_, {forbidden, _}}) -> - true; - (_) -> - false - end, - case lists:partition(Pred, Replies) of - {[], _} -> - false; - {[ForbiddenReply = {_, {forbidden, _}} | _], RemReplies} -> - case lists:all(fun(E) -> E =:= noreply end, RemReplies) of - true -> - {forbidden, ForbiddenReply}; - false -> - false - end - end. - -update_quorum_met(W, Replies) -> - Counters = lists:foldl( - fun(R, D) -> orddict:update_counter(R, 1, D) end, - orddict:new(), - Replies - ), - GoodReplies = lists:filter(fun good_reply/1, Counters), - case lists:dropwhile(fun({_, Count}) -> Count < W end, GoodReplies) of - [] -> - false; - [{FinalReply, _} | _] -> - {true, FinalReply} - end. - -good_reply({{ok, _}, _}) -> - true; -good_reply({noreply, _}) -> - true; -good_reply(_) -> - false. - --spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}]. group_docs_by_shard(DbName, Docs) -> - dict:to_list( - lists:foldl( - fun(#doc{id = Id} = Doc, D0) -> - lists:foldl( - fun(Shard, D1) -> - dict:append(Shard, Doc, D1) - end, - D0, - mem3:shards(DbName, Id) - ) - end, - dict:new(), - Docs - ) - ). - -append_update_replies([], [], DocReplyDict) -> - DocReplyDict; -append_update_replies([Doc | Rest], [], Dict0) -> - % icky, if replicated_changes only errors show up in result - append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0)); -append_update_replies([Doc | Rest1], [Reply | Rest2], Dict0) -> - append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)). - -skip_message({0, _, W, _, DocReplyDict}) -> - {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocReplyDict), - {stop, {Health, Reply}}; -skip_message(Acc0) -> - {ok, Acc0}. + KeyFun = fun(#doc{} = Doc) -> + mem3_shards:for_docid(DbName, Doc#doc.id) + end, + maps:groups_from_list(KeyFun, Docs). validate_atomic_update(_, _, false) -> ok; @@ -329,462 +92,4 @@ validate_atomic_update(_DbName, AllDocs, true) -> -include_lib("eunit/include/eunit.hrl"). -setup_all() -> - meck:new([couch_log, couch_stats]), - meck:expect(couch_log, warning, fun(_, _) -> ok end), - meck:expect(couch_stats, increment_counter, fun(_) -> ok end). - -teardown_all(_) -> - meck:unload(). - -doc_update_test_() -> - { - setup, - fun setup_all/0, - fun teardown_all/1, - [ - fun doc_update1/0, - fun doc_update2/0, - fun doc_update3/0, - fun handle_all_dbs_active/0, - fun handle_two_all_dbs_actives/0, - fun one_forbid/0, - fun two_forbid/0, - fun extend_tree_forbid/0, - fun other_errors_one_forbid/0, - fun one_error_two_forbid/0, - fun one_success_two_forbid/0, - fun worker_before_doc_update_forbidden/0 - ] - }. - -% eunits -doc_update1() -> - Doc1 = #doc{revs = {1, [<<"foo">>]}}, - Doc2 = #doc{revs = {1, [<<"bar">>]}}, - Docs = [Doc1], - Docs2 = [Doc1, Doc2], - Dict = dict:from_list([{Doc, []} || Doc <- Docs]), - Dict2 = dict:from_list([{Doc, []} || Doc <- Docs2]), - - Shards = - mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), - GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - - % test for W = 2 - AccW2 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs, Dict}, - - {ok, {WaitingCountW2_1, _, _, _, _} = AccW2_1} = - handle_message({ok, [{ok, Doc1}]}, hd(Shards), AccW2), - ?assertEqual(WaitingCountW2_1, 2), - {stop, FinalReplyW2} = - handle_message({ok, [{ok, Doc1}]}, lists:nth(2, Shards), AccW2_1), - ?assertEqual({ok, [{Doc1, {ok, Doc1}}]}, FinalReplyW2), - - % test for W = 3 - AccW3 = {length(Shards), length(Docs), list_to_integer("3"), GroupedDocs, Dict}, - - {ok, {WaitingCountW3_1, _, _, _, _} = AccW3_1} = - handle_message({ok, [{ok, Doc1}]}, hd(Shards), AccW3), - ?assertEqual(WaitingCountW3_1, 2), - - {ok, {WaitingCountW3_2, _, _, _, _} = AccW3_2} = - handle_message({ok, [{ok, Doc1}]}, lists:nth(2, Shards), AccW3_1), - ?assertEqual(WaitingCountW3_2, 1), - - {stop, FinalReplyW3} = - handle_message({ok, [{ok, Doc1}]}, lists:nth(3, Shards), AccW3_2), - ?assertEqual({ok, [{Doc1, {ok, Doc1}}]}, FinalReplyW3), - - % test w quorum > # shards, which should fail immediately - - Shards2 = mem3_util:create_partition_map("foo", 1, 1, ["node1"]), - GroupedDocs2 = group_docs_by_shard_hack(<<"foo">>, Shards2, Docs), - - AccW4 = - {length(Shards2), length(Docs), list_to_integer("2"), GroupedDocs2, Dict}, - Bool = - case handle_message({ok, [{ok, Doc1}]}, hd(Shards2), AccW4) of - {stop, _Reply} -> - true; - _ -> - false - end, - ?assertEqual(Bool, true), - - % Docs with no replies should end up as {error, internal_server_error} - SA1 = #shard{node = a, range = 1}, - SB1 = #shard{node = b, range = 1}, - SA2 = #shard{node = a, range = 2}, - SB2 = #shard{node = b, range = 2}, - GroupedDocs3 = [{SA1, [Doc1]}, {SB1, [Doc1]}, {SA2, [Doc2]}, {SB2, [Doc2]}], - StW5_0 = {length(GroupedDocs3), length(Docs2), 2, GroupedDocs3, Dict2}, - {ok, StW5_1} = handle_message({ok, [{ok, "A"}]}, SA1, StW5_0), - {ok, StW5_2} = handle_message({rexi_EXIT, nil}, SB1, StW5_1), - {ok, StW5_3} = handle_message({rexi_EXIT, nil}, SA2, StW5_2), - {stop, ReplyW5} = handle_message({rexi_EXIT, nil}, SB2, StW5_3), - ?assertEqual( - {error, [{Doc1, {accepted, "A"}}, {Doc2, {error, internal_server_error}}]}, - ReplyW5 - ). - -doc_update2() -> - Doc1 = #doc{revs = {1, [<<"foo">>]}}, - Doc2 = #doc{revs = {1, [<<"bar">>]}}, - Docs = [Doc1, Doc2], - Shards = - mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), - GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) - }, - - {ok, {WaitingCount1, _, _, _, _} = Acc1} = - handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, hd(Shards), Acc0), - ?assertEqual(WaitingCount1, 2), - - {ok, {WaitingCount2, _, _, _, _} = Acc2} = - handle_message({rexi_EXIT, 1}, lists:nth(2, Shards), Acc1), - ?assertEqual(WaitingCount2, 1), - - {stop, Reply} = - handle_message({rexi_EXIT, 1}, lists:nth(3, Shards), Acc2), - - ?assertEqual( - {accepted, [{Doc1, {accepted, Doc1}}, {Doc2, {accepted, Doc2}}]}, - Reply - ). - -doc_update3() -> - Doc1 = #doc{revs = {1, [<<"foo">>]}}, - Doc2 = #doc{revs = {1, [<<"bar">>]}}, - Docs = [Doc1, Doc2], - Shards = - mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), - GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) - }, - - {ok, {WaitingCount1, _, _, _, _} = Acc1} = - handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, hd(Shards), Acc0), - ?assertEqual(WaitingCount1, 2), - - {ok, {WaitingCount2, _, _, _, _} = Acc2} = - handle_message({rexi_EXIT, 1}, lists:nth(2, Shards), Acc1), - ?assertEqual(WaitingCount2, 1), - - {stop, Reply} = - handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, lists:nth(3, Shards), Acc2), - - ?assertEqual({ok, [{Doc1, {ok, Doc1}}, {Doc2, {ok, Doc2}}]}, Reply). - -handle_all_dbs_active() -> - Doc1 = #doc{revs = {1, [<<"foo">>]}}, - Doc2 = #doc{revs = {1, [<<"bar">>]}}, - Docs = [Doc1, Doc2], - Shards = - mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), - GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) - }, - - {ok, {WaitingCount1, _, _, _, _} = Acc1} = - handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, hd(Shards), Acc0), - ?assertEqual(WaitingCount1, 2), - - {ok, {WaitingCount2, _, _, _, _} = Acc2} = - handle_message({error, all_dbs_active}, lists:nth(2, Shards), Acc1), - ?assertEqual(WaitingCount2, 1), - - {stop, Reply} = - handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, lists:nth(3, Shards), Acc2), - - ?assertEqual({ok, [{Doc1, {ok, Doc1}}, {Doc2, {ok, Doc2}}]}, Reply). - -handle_two_all_dbs_actives() -> - Doc1 = #doc{revs = {1, [<<"foo">>]}}, - Doc2 = #doc{revs = {1, [<<"bar">>]}}, - Docs = [Doc1, Doc2], - Shards = - mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), - GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) - }, - - {ok, {WaitingCount1, _, _, _, _} = Acc1} = - handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, hd(Shards), Acc0), - ?assertEqual(WaitingCount1, 2), - - {ok, {WaitingCount2, _, _, _, _} = Acc2} = - handle_message({error, all_dbs_active}, lists:nth(2, Shards), Acc1), - ?assertEqual(WaitingCount2, 1), - - {stop, Reply} = - handle_message({error, all_dbs_active}, lists:nth(3, Shards), Acc2), - - ?assertEqual( - {accepted, [{Doc1, {accepted, Doc1}}, {Doc2, {accepted, Doc2}}]}, - Reply - ). - -one_forbid() -> - Doc1 = #doc{revs = {1, [<<"foo">>]}}, - Doc2 = #doc{revs = {1, [<<"bar">>]}}, - Docs = [Doc1, Doc2], - Shards = - mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), - GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) - }, - - {ok, {WaitingCount1, _, _, _, _} = Acc1} = - handle_message({ok, [{ok, Doc1}, noreply]}, hd(Shards), Acc0), - ?assertEqual(WaitingCount1, 2), - - {ok, {WaitingCount2, _, _, _, _} = Acc2} = - handle_message( - {ok, [{ok, Doc1}, {Doc2, {forbidden, <<"not allowed">>}}]}, lists:nth(2, Shards), Acc1 - ), - ?assertEqual(WaitingCount2, 1), - - {stop, Reply} = - handle_message({ok, [{ok, Doc1}, noreply]}, lists:nth(3, Shards), Acc2), - - ?assertEqual( - {ok, [ - {Doc1, {ok, Doc1}}, - {Doc2, {Doc2, {forbidden, <<"not allowed">>}}} - ]}, - Reply - ). - -two_forbid() -> - Doc1 = #doc{revs = {1, [<<"foo">>]}}, - Doc2 = #doc{revs = {1, [<<"bar">>]}}, - Docs = [Doc1, Doc2], - Shards = - mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), - GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) - }, - - {ok, {WaitingCount1, _, _, _, _} = Acc1} = - handle_message({ok, [{ok, Doc1}, noreply]}, hd(Shards), Acc0), - ?assertEqual(WaitingCount1, 2), - - {ok, {WaitingCount2, _, _, _, _} = Acc2} = - handle_message( - {ok, [{ok, Doc1}, {Doc2, {forbidden, <<"not allowed">>}}]}, lists:nth(2, Shards), Acc1 - ), - ?assertEqual(WaitingCount2, 1), - - {stop, Reply} = - handle_message( - {ok, [{ok, Doc1}, {Doc2, {forbidden, <<"not allowed">>}}]}, lists:nth(3, Shards), Acc2 - ), - - ?assertEqual( - {ok, [ - {Doc1, {ok, Doc1}}, - {Doc2, {Doc2, {forbidden, <<"not allowed">>}}} - ]}, - Reply - ). - -% This should actually never happen, because an `{ok, Doc}` message means that the revision -% tree is extended and so the VDU should forbid the document. -% Leaving this test here to make sure quorum rules still apply. -extend_tree_forbid() -> - Doc1 = #doc{revs = {1, [<<"foo">>]}}, - Doc2 = #doc{revs = {1, [<<"bar">>]}}, - Docs = [Doc1, Doc2], - Shards = - mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), - GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) - }, - - {ok, {WaitingCount1, _, _, _, _} = Acc1} = - handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, hd(Shards), Acc0), - ?assertEqual(WaitingCount1, 2), - - {ok, {WaitingCount2, _, _, _, _} = Acc2} = - handle_message( - {ok, [{ok, Doc1}, {Doc2, {forbidden, <<"not allowed">>}}]}, lists:nth(2, Shards), Acc1 - ), - ?assertEqual(WaitingCount2, 1), - - {stop, Reply} = - handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, lists:nth(3, Shards), Acc2), - - ?assertEqual({ok, [{Doc1, {ok, Doc1}}, {Doc2, {ok, Doc2}}]}, Reply). - -other_errors_one_forbid() -> - Doc1 = #doc{revs = {1, [<<"foo">>]}}, - Doc2 = #doc{revs = {1, [<<"bar">>]}}, - Docs = [Doc1, Doc2], - Shards = - mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), - GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) - }, - - {ok, {WaitingCount1, _, _, _, _} = Acc1} = - handle_message({ok, [{ok, Doc1}, {Doc2, {error, <<"foo">>}}]}, hd(Shards), Acc0), - ?assertEqual(WaitingCount1, 2), - - {ok, {WaitingCount2, _, _, _, _} = Acc2} = - handle_message({ok, [{ok, Doc1}, {Doc2, {error, <<"bar">>}}]}, lists:nth(2, Shards), Acc1), - ?assertEqual(WaitingCount2, 1), - - {stop, Reply} = - handle_message( - {ok, [{ok, Doc1}, {Doc2, {forbidden, <<"not allowed">>}}]}, lists:nth(3, Shards), Acc2 - ), - ?assertEqual({error, [{Doc1, {ok, Doc1}}, {Doc2, {Doc2, {error, <<"foo">>}}}]}, Reply). - -one_error_two_forbid() -> - Doc1 = #doc{revs = {1, [<<"foo">>]}}, - Doc2 = #doc{revs = {1, [<<"bar">>]}}, - Docs = [Doc1, Doc2], - Shards = - mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), - GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) - }, - - {ok, {WaitingCount1, _, _, _, _} = Acc1} = - handle_message( - {ok, [{ok, Doc1}, {Doc2, {forbidden, <<"not allowed">>}}]}, hd(Shards), Acc0 - ), - ?assertEqual(WaitingCount1, 2), - - {ok, {WaitingCount2, _, _, _, _} = Acc2} = - handle_message({ok, [{ok, Doc1}, {Doc2, {error, <<"foo">>}}]}, lists:nth(2, Shards), Acc1), - ?assertEqual(WaitingCount2, 1), - - {stop, Reply} = - handle_message( - {ok, [{ok, Doc1}, {Doc2, {forbidden, <<"not allowed">>}}]}, lists:nth(3, Shards), Acc2 - ), - ?assertEqual( - {error, [{Doc1, {ok, Doc1}}, {Doc2, {Doc2, {forbidden, <<"not allowed">>}}}]}, Reply - ). - -one_success_two_forbid() -> - Doc1 = #doc{revs = {1, [<<"foo">>]}}, - Doc2 = #doc{revs = {1, [<<"bar">>]}}, - Docs = [Doc1, Doc2], - Shards = - mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), - GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) - }, - - {ok, {WaitingCount1, _, _, _, _} = Acc1} = - handle_message( - {ok, [{ok, Doc1}, {Doc2, {forbidden, <<"not allowed">>}}]}, hd(Shards), Acc0 - ), - ?assertEqual(WaitingCount1, 2), - - {ok, {WaitingCount2, _, _, _, _} = Acc2} = - handle_message({ok, [{ok, Doc1}, {Doc2, {ok, Doc2}}]}, lists:nth(2, Shards), Acc1), - ?assertEqual(WaitingCount2, 1), - - {stop, Reply} = - handle_message( - {ok, [{ok, Doc1}, {Doc2, {forbidden, <<"not allowed">>}}]}, lists:nth(3, Shards), Acc2 - ), - ?assertEqual( - {error, [{Doc1, {ok, Doc1}}, {Doc2, {Doc2, {forbidden, <<"not allowed">>}}}]}, Reply - ). - -worker_before_doc_update_forbidden() -> - Doc1 = #doc{revs = {1, [<<"foo">>]}}, - Docs = [Doc1], - Shards = - mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), - GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - Acc = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) - }, - ?assertThrow({forbidden, <<"msg">>}, handle_message({forbidden, <<"msg">>}, hd(Shards), Acc)). - -% needed for testing to avoid having to start the mem3 application -group_docs_by_shard_hack(_DbName, Shards, Docs) -> - dict:to_list( - lists:foldl( - fun(#doc{id = _Id} = Doc, D0) -> - lists:foldl( - fun(Shard, D1) -> - dict:append(Shard, Doc, D1) - end, - D0, - Shards - ) - end, - dict:new(), - Docs - ) - ). - -endif.
