This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch ra
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 8aa58f1195da6dcc4c74b6b3d4a54e73a7138cd3
Author: Robert Newson <[email protected]>
AuthorDate: Fri Jan 24 18:25:34 2025 +0000

    wip
---
 src/fabric/src/fabric_doc_update.erl | 69 +++++++++++++++++++++++++++++++-----
 1 file changed, 60 insertions(+), 9 deletions(-)

diff --git a/src/fabric/src/fabric_doc_update.erl 
b/src/fabric/src/fabric_doc_update.erl
index cb9aaf6b0..2c5d3f4ed 100644
--- a/src/fabric/src/fabric_doc_update.erl
+++ b/src/fabric/src/fabric_doc_update.erl
@@ -17,21 +17,72 @@
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
+-record(acc, {
+    options,
+    timeout,
+    replies
+}).
+
 go(_, [], _) ->
     {ok, []};
 go(DbName, AllDocs0, Opts) ->
     AllDocs = before_doc_update(DbName, AllDocs0, Opts),
     validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, 
Opts)),
-    Options = lists:delete(all_or_nothing, Opts),
     GroupedDocs = group_docs_by_shard(DbName, AllDocs),
-    Timeout = fabric_util:request_timeout(),
-    RaFun = fun(Shards, Docs) ->
-        ShardDbName = (hd(Shards))#shard.name,
-        Nodes = [S#shard.node || S <- Shards],
-        Queue = chttpd_app:couch_write_queue_name(Nodes),
-        ra:process_command(Queue, {update_docs, ShardDbName, Docs, Options}, 
Timeout)
-    end,
-    maps:foreach(RaFun, GroupedDocs).
+    Acc = #acc{
+             options = lists:delete(all_or_nothing, Opts),
+             timeout = fabric_util:request_timeout(),
+             replies = []
+            },
+    FoldFun = fun(Shards, Docs, FunAcc) ->
+                      Nodes = [S#shard.node || S <- Shards],
+                      ServerId = chttpd_app:couch_write_queue_name(Nodes),
+                      Ref = start_worker(ServerId, Shards, Docs, 
Acc#acc.options),
+                      [{Shards, Ref} | FunAcc] end,
+    Refs = maps:fold(FoldFun, [], GroupedDocs),
+    recv_loop(GroupedDocs, Refs, Acc).
+
+recv_loop(_GroupedDocs, _Refs, #acc{} = Acc) when Acc#acc.timeout =< 0 ->
+    {error, timeout};
+recv_loop(GroupedDocs, _Refs, #acc{} = Acc) when map_size(GroupedDocs) == 
length(Acc#acc.replies) ->
+    {ok, Acc};
+recv_loop(GroupedDocs, Refs0, #acc{} = Acc0) ->
+    T0 = ts(),
+    receive
+        {ra_event, _CurrentLeader, {applied, [{Correlation, Reply}]}} ->
+            couch_log:warning("received reply for ~p : ~p", [Correlation, 
Reply]),
+            Elapsed = ts() - T0,
+            Acc1 = Acc0#acc{
+                     timeout = Acc0#acc.timeout - Elapsed,
+                     replies = [Reply | Acc0#acc.replies]
+                    },
+            recv_loop(GroupedDocs, Refs0, Acc1);
+        {ra_event, _FromId, {rejected, {not_leader, undefined, Correlation}}} 
->
+            couch_log:warning("received no leader ~p", [Correlation]),
+            {error, no_leader};
+        {ra_event, _FromId, {rejected, {not_leader, Leader, Correlation}}} ->
+            couch_log:warning("received not leader for ~p", [Correlation]),
+            Elapsed = ts() - T0,
+            Acc1 = Acc0#acc{timeout = Acc0#acc.timeout - Elapsed},
+            {value, {Shards, Correlation}, Refs1} = lists:keytake(Correlation, 
2, Refs0),
+            #{Shards := Docs} = GroupedDocs,
+            Ref = start_worker(Leader, Shards, Docs, Acc0#acc.options),
+            recv_loop(GroupedDocs, [{Shards, Ref} | Refs1], Acc1)
+    after
+        Acc0#acc.timeout ->
+            {error, timeout}
+    end.
+
+start_worker(Leader, Shards, Docs, Options) ->
+    ShardDbName = (hd(Shards))#shard.name,
+    Ref = make_ref(),
+    ok = ra:pipeline_command(Leader, {update_docs, ShardDbName, Docs, 
Options}, Ref, normal),
+    couch_log:warning("started worker for ~s with ref ~p", [ShardDbName, Ref]),
+    Ref.
+
+ts() ->
+    erlang:monotonic_time(millisecond).
+
 
 before_doc_update(DbName, Docs, Opts) ->
     % Use the same pattern as in couch_db:validate_doc_update/3. If the 
document was already

Reply via email to