This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch ra in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 8aa58f1195da6dcc4c74b6b3d4a54e73a7138cd3 Author: Robert Newson <[email protected]> AuthorDate: Fri Jan 24 18:25:34 2025 +0000 wip --- src/fabric/src/fabric_doc_update.erl | 69 +++++++++++++++++++++++++++++++----- 1 file changed, 60 insertions(+), 9 deletions(-) diff --git a/src/fabric/src/fabric_doc_update.erl b/src/fabric/src/fabric_doc_update.erl index cb9aaf6b0..2c5d3f4ed 100644 --- a/src/fabric/src/fabric_doc_update.erl +++ b/src/fabric/src/fabric_doc_update.erl @@ -17,21 +17,72 @@ -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). +-record(acc, { + options, + timeout, + replies +}). + go(_, [], _) -> {ok, []}; go(DbName, AllDocs0, Opts) -> AllDocs = before_doc_update(DbName, AllDocs0, Opts), validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)), - Options = lists:delete(all_or_nothing, Opts), GroupedDocs = group_docs_by_shard(DbName, AllDocs), - Timeout = fabric_util:request_timeout(), - RaFun = fun(Shards, Docs) -> - ShardDbName = (hd(Shards))#shard.name, - Nodes = [S#shard.node || S <- Shards], - Queue = chttpd_app:couch_write_queue_name(Nodes), - ra:process_command(Queue, {update_docs, ShardDbName, Docs, Options}, Timeout) - end, - maps:foreach(RaFun, GroupedDocs). + Acc = #acc{ + options = lists:delete(all_or_nothing, Opts), + timeout = fabric_util:request_timeout(), + replies = [] + }, + FoldFun = fun(Shards, Docs, FunAcc) -> + Nodes = [S#shard.node || S <- Shards], + ServerId = chttpd_app:couch_write_queue_name(Nodes), + Ref = start_worker(ServerId, Shards, Docs, Acc#acc.options), + [{Shards, Ref} | FunAcc] end, + Refs = maps:fold(FoldFun, [], GroupedDocs), + recv_loop(GroupedDocs, Refs, Acc). + +recv_loop(_GroupedDocs, _Refs, #acc{} = Acc) when Acc#acc.timeout =< 0 -> + {error, timeout}; +recv_loop(GroupedDocs, _Refs, #acc{} = Acc) when map_size(GroupedDocs) == length(Acc#acc.replies) -> + {ok, Acc}; +recv_loop(GroupedDocs, Refs0, #acc{} = Acc0) -> + T0 = ts(), + receive + {ra_event, _CurrentLeader, {applied, [{Correlation, Reply}]}} -> + couch_log:warning("received reply for ~p : ~p", [Correlation, Reply]), + Elapsed = ts() - T0, + Acc1 = Acc0#acc{ + timeout = Acc0#acc.timeout - Elapsed, + replies = [Reply | Acc0#acc.replies] + }, + recv_loop(GroupedDocs, Refs0, Acc1); + {ra_event, _FromId, {rejected, {not_leader, undefined, Correlation}}} -> + couch_log:warning("received no leader ~p", [Correlation]), + {error, no_leader}; + {ra_event, _FromId, {rejected, {not_leader, Leader, Correlation}}} -> + couch_log:warning("received not leader for ~p", [Correlation]), + Elapsed = ts() - T0, + Acc1 = Acc0#acc{timeout = Acc0#acc.timeout - Elapsed}, + {value, {Shards, Correlation}, Refs1} = lists:keytake(Correlation, 2, Refs0), + #{Shards := Docs} = GroupedDocs, + Ref = start_worker(Leader, Shards, Docs, Acc0#acc.options), + recv_loop(GroupedDocs, [{Shards, Ref} | Refs1], Acc1) + after + Acc0#acc.timeout -> + {error, timeout} + end. + +start_worker(Leader, Shards, Docs, Options) -> + ShardDbName = (hd(Shards))#shard.name, + Ref = make_ref(), + ok = ra:pipeline_command(Leader, {update_docs, ShardDbName, Docs, Options}, Ref, normal), + couch_log:warning("started worker for ~s with ref ~p", [ShardDbName, Ref]), + Ref. + +ts() -> + erlang:monotonic_time(millisecond). + before_doc_update(DbName, Docs, Opts) -> % Use the same pattern as in couch_db:validate_doc_update/3. If the document was already
