This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch ra in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 409becbdf8b15a9860d04f476e7e5752b717a48f Author: Robert Newson <[email protected]> AuthorDate: Mon Jan 27 15:48:44 2025 +0000 command per doc, not optimal but easy to read WIP --- src/couch/src/couch_write_queue.erl | 8 ++-- src/fabric/src/fabric_doc_update.erl | 83 ++++-------------------------------- 2 files changed, 13 insertions(+), 78 deletions(-) diff --git a/src/couch/src/couch_write_queue.erl b/src/couch/src/couch_write_queue.erl index 314425192..7ff8c22ae 100644 --- a/src/couch/src/couch_write_queue.erl +++ b/src/couch/src/couch_write_queue.erl @@ -24,13 +24,13 @@ init(_Conf) -> nil. -apply(_Meta, {update_docs, DbName, Docs, Options} = Command, State) when - is_binary(DbName), is_list(Docs), is_list(Options) +apply(_Meta, {update_doc, DbName, Doc, Options}, State) when + is_binary(DbName), is_record(Doc, doc), is_list(Options) -> {ok, Db} = couch_db:open(DbName, [{create_if_missing, true}, ?ADMIN_CTX]), try - Reply = couch_db:update_docs(Db, Docs, Options, ?INTERACTIVE_EDIT), - couch_log:notice("apply ~s ~p ~p", [couch_db:name(Db), Docs, Reply]), + Reply = couch_db:update_doc(Db, Doc, Options), + couch_log:notice("apply ~s ~p ~p", [couch_db:name(Db), Doc, Reply]), {State, Reply} after couch_db:close(Db) diff --git a/src/fabric/src/fabric_doc_update.erl b/src/fabric/src/fabric_doc_update.erl index 7ba36a99d..76aad2048 100644 --- a/src/fabric/src/fabric_doc_update.erl +++ b/src/fabric/src/fabric_doc_update.erl @@ -17,79 +17,20 @@ -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). --record(acc, { - options, - timeout, - replies -}). - go(_, [], _) -> {ok, []}; -go(DbName, AllDocs0, Opts) -> - AllDocs = before_doc_update(DbName, AllDocs0, Opts), - validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)), - GroupedDocs = group_docs_by_shard(DbName, AllDocs), - Acc = #acc{ - options = lists:delete(all_or_nothing, Opts), - timeout = fabric_util:request_timeout(), - replies = #{} - }, - FoldFun = fun(Shards, Docs, FunAcc) -> +go(DbName, AllDocs0, Options0) -> + AllDocs1 = before_doc_update(DbName, AllDocs0, Options0), + validate_atomic_update(DbName, AllDocs0, lists:member(all_or_nothing, Options0)), + Options1 = lists:delete(all_or_nothing, Options0), + UpdateFun = fun(Doc) -> + [Shard | _] = Shards = mem3_shards:for_docid(DbName, Doc#doc.id), Nodes = [S#shard.node || S <- Shards], ServerId = chttpd_app:couch_write_queue_name(Nodes), - Ref = start_worker(ServerId, Shards, Docs, Acc#acc.options), - [{Shards, Ref} | FunAcc] + {ok, Reply, _Leader} = ra:process_command(ServerId, {update_doc, Shard#shard.name, Doc, Options1}), + Reply end, - Refs = maps:fold(FoldFun, [], GroupedDocs), - {ok, recv_loop(GroupedDocs, Refs, Acc)}. - -recv_loop(_GroupedDocs, _Refs, #acc{} = Acc) when Acc#acc.timeout =< 0 -> - {error, timeout}; -recv_loop(GroupedDocs, _Refs, #acc{} = Acc) when map_size(GroupedDocs) == length(Acc#acc.replies) -> - {ok, Acc}; -recv_loop(GroupedDocs, Refs0, #acc{} = Acc0) -> - T0 = ts(), - receive - {ra_event, _CurrentLeader, {applied, [{Correlation, Reply}]}} -> - couch_log:warning("received reply for ~p : ~p", [Correlation, Reply]), - Elapsed = ts() - T0, - {Shards, Correlation} = lists:keyfind(Correlation, 2, Refs0), - Acc1 = - case Reply of - {ok, Results} -> - Acc0#acc{ - timeout = Acc0#acc.timeout - Elapsed, - replies = (Acc0#acc.replies)#{Shards => Results} - }; - Else -> - couch_log:error("OOPS ~p", [Else]), - Acc0 - end, - recv_loop(GroupedDocs, Refs0, Acc1); - {ra_event, _FromId, {rejected, {not_leader, undefined, Correlation}}} -> - couch_log:warning("received no leader ~p", [Correlation]), - {error, no_leader}; - {ra_event, _FromId, {rejected, {not_leader, Leader, Correlation}}} -> - couch_log:warning("received not leader for ~p", [Correlation]), - Elapsed = ts() - T0, - Acc1 = Acc0#acc{timeout = Acc0#acc.timeout - Elapsed}, - {value, {Shards, Correlation}, Refs1} = lists:keytake(Correlation, 2, Refs0), - #{Shards := Docs} = GroupedDocs, - Ref = start_worker(Leader, Shards, Docs, Acc0#acc.options), - recv_loop(GroupedDocs, [{Shards, Ref} | Refs1], Acc1) - after Acc0#acc.timeout -> - {error, timeout} - end. - -start_worker(Leader, Shards, Docs, Options) -> - ShardDbName = (hd(Shards))#shard.name, - Ref = make_ref(), - ok = ra:pipeline_command(Leader, {update_docs, ShardDbName, Docs, Options}, Ref, normal), - couch_log:warning("started worker for ~s with ref ~p", [ShardDbName, Ref]), - Ref. - -ts() -> - erlang:monotonic_time(millisecond). + {ok, lists:map(UpdateFun, AllDocs1)}. before_doc_update(DbName, Docs, Opts) -> % Use the same pattern as in couch_db:validate_doc_update/3. If the document was already @@ -121,12 +62,6 @@ before_doc_update(DbName, Docs, Opts) -> Docs end. -group_docs_by_shard(DbName, Docs) -> - KeyFun = fun(#doc{} = Doc) -> - mem3_shards:for_docid(DbName, Doc#doc.id) - end, - maps:groups_from_list(KeyFun, Docs). - validate_atomic_update(_, _, false) -> ok; validate_atomic_update(_DbName, AllDocs, true) ->
