This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch ra in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit ee4c637899cda5b11436ad6d6fb16ec6069cc23a Author: Robert Newson <[email protected]> AuthorDate: Mon Jan 27 15:23:57 2025 +0000 wip --- src/couch/src/couch_write_queue.erl | 18 ++++----------- src/fabric/src/fabric_doc_update.erl | 43 +++++++++++++++++++++--------------- 2 files changed, 29 insertions(+), 32 deletions(-) diff --git a/src/couch/src/couch_write_queue.erl b/src/couch/src/couch_write_queue.erl index a7ebdd470..314425192 100644 --- a/src/couch/src/couch_write_queue.erl +++ b/src/couch/src/couch_write_queue.erl @@ -16,9 +16,7 @@ -export([ init/1, - apply/3, - init_aux/1, - handle_aux/5 + apply/3 ]). -include_lib("couch/include/couch_db.hrl"). @@ -29,19 +27,11 @@ init(_Conf) -> apply(_Meta, {update_docs, DbName, Docs, Options} = Command, State) when is_binary(DbName), is_list(Docs), is_list(Options) -> - {State, ok, [{aux, Command}]}. - -init_aux(_Name) -> - nil. - -handle_aux(_RaftState, cast, {update_docs, DbName, Docs, Options}, AuxState, IntState) -> {ok, Db} = couch_db:open(DbName, [{create_if_missing, true}, ?ADMIN_CTX]), try Reply = couch_db:update_docs(Db, Docs, Options, ?INTERACTIVE_EDIT), - couch_log:notice("handle_aux ~s ~p ~p", [couch_db:name(Db), Docs, Reply]), - {reply, Reply, AuxState, IntState} + couch_log:notice("apply ~s ~p ~p", [couch_db:name(Db), Docs, Reply]), + {State, Reply} after couch_db:close(Db) - end; -handle_aux(_RaftState, cast, _Command, AuxState, IntState) -> - {no_reply, AuxState, IntState}. + end. diff --git a/src/fabric/src/fabric_doc_update.erl b/src/fabric/src/fabric_doc_update.erl index 2c5d3f4ed..7ba36a99d 100644 --- a/src/fabric/src/fabric_doc_update.erl +++ b/src/fabric/src/fabric_doc_update.erl @@ -30,17 +30,18 @@ go(DbName, AllDocs0, Opts) -> validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)), GroupedDocs = group_docs_by_shard(DbName, AllDocs), Acc = #acc{ - options = lists:delete(all_or_nothing, Opts), - timeout = fabric_util:request_timeout(), - replies = [] - }, + options = lists:delete(all_or_nothing, Opts), + timeout = fabric_util:request_timeout(), + replies = #{} + }, FoldFun = fun(Shards, Docs, FunAcc) -> - Nodes = [S#shard.node || S <- Shards], - ServerId = chttpd_app:couch_write_queue_name(Nodes), - Ref = start_worker(ServerId, Shards, Docs, Acc#acc.options), - [{Shards, Ref} | FunAcc] end, + Nodes = [S#shard.node || S <- Shards], + ServerId = chttpd_app:couch_write_queue_name(Nodes), + Ref = start_worker(ServerId, Shards, Docs, Acc#acc.options), + [{Shards, Ref} | FunAcc] + end, Refs = maps:fold(FoldFun, [], GroupedDocs), - recv_loop(GroupedDocs, Refs, Acc). + {ok, recv_loop(GroupedDocs, Refs, Acc)}. recv_loop(_GroupedDocs, _Refs, #acc{} = Acc) when Acc#acc.timeout =< 0 -> {error, timeout}; @@ -52,10 +53,18 @@ recv_loop(GroupedDocs, Refs0, #acc{} = Acc0) -> {ra_event, _CurrentLeader, {applied, [{Correlation, Reply}]}} -> couch_log:warning("received reply for ~p : ~p", [Correlation, Reply]), Elapsed = ts() - T0, - Acc1 = Acc0#acc{ - timeout = Acc0#acc.timeout - Elapsed, - replies = [Reply | Acc0#acc.replies] - }, + {Shards, Correlation} = lists:keyfind(Correlation, 2, Refs0), + Acc1 = + case Reply of + {ok, Results} -> + Acc0#acc{ + timeout = Acc0#acc.timeout - Elapsed, + replies = (Acc0#acc.replies)#{Shards => Results} + }; + Else -> + couch_log:error("OOPS ~p", [Else]), + Acc0 + end, recv_loop(GroupedDocs, Refs0, Acc1); {ra_event, _FromId, {rejected, {not_leader, undefined, Correlation}}} -> couch_log:warning("received no leader ~p", [Correlation]), @@ -68,9 +77,8 @@ recv_loop(GroupedDocs, Refs0, #acc{} = Acc0) -> #{Shards := Docs} = GroupedDocs, Ref = start_worker(Leader, Shards, Docs, Acc0#acc.options), recv_loop(GroupedDocs, [{Shards, Ref} | Refs1], Acc1) - after - Acc0#acc.timeout -> - {error, timeout} + after Acc0#acc.timeout -> + {error, timeout} end. start_worker(Leader, Shards, Docs, Options) -> @@ -83,7 +91,6 @@ start_worker(Leader, Shards, Docs, Options) -> ts() -> erlang:monotonic_time(millisecond). - before_doc_update(DbName, Docs, Opts) -> % Use the same pattern as in couch_db:validate_doc_update/3. If the document was already % checked during the interactive edit we don't want to spend time in the internal replicator @@ -116,7 +123,7 @@ before_doc_update(DbName, Docs, Opts) -> group_docs_by_shard(DbName, Docs) -> KeyFun = fun(#doc{} = Doc) -> - mem3_shards:for_docid(DbName, Doc#doc.id) + mem3_shards:for_docid(DbName, Doc#doc.id) end, maps:groups_from_list(KeyFun, Docs).
