This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch ra
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit ee4c637899cda5b11436ad6d6fb16ec6069cc23a
Author: Robert Newson <[email protected]>
AuthorDate: Mon Jan 27 15:23:57 2025 +0000

    wip
---
 src/couch/src/couch_write_queue.erl  | 18 ++++-----------
 src/fabric/src/fabric_doc_update.erl | 43 +++++++++++++++++++++---------------
 2 files changed, 29 insertions(+), 32 deletions(-)

diff --git a/src/couch/src/couch_write_queue.erl 
b/src/couch/src/couch_write_queue.erl
index a7ebdd470..314425192 100644
--- a/src/couch/src/couch_write_queue.erl
+++ b/src/couch/src/couch_write_queue.erl
@@ -16,9 +16,7 @@
 
 -export([
     init/1,
-    apply/3,
-    init_aux/1,
-    handle_aux/5
+    apply/3
 ]).
 
 -include_lib("couch/include/couch_db.hrl").
@@ -29,19 +27,11 @@ init(_Conf) ->
 apply(_Meta, {update_docs, DbName, Docs, Options} = Command, State) when
     is_binary(DbName), is_list(Docs), is_list(Options)
 ->
-    {State, ok, [{aux, Command}]}.
-
-init_aux(_Name) ->
-    nil.
-
-handle_aux(_RaftState, cast, {update_docs, DbName, Docs, Options}, AuxState, 
IntState) ->
     {ok, Db} = couch_db:open(DbName, [{create_if_missing, true}, ?ADMIN_CTX]),
     try
         Reply = couch_db:update_docs(Db, Docs, Options, ?INTERACTIVE_EDIT),
-        couch_log:notice("handle_aux ~s ~p ~p", [couch_db:name(Db), Docs, 
Reply]),
-        {reply, Reply, AuxState, IntState}
+        couch_log:notice("apply ~s ~p ~p", [couch_db:name(Db), Docs, Reply]),
+        {State, Reply}
     after
         couch_db:close(Db)
-    end;
-handle_aux(_RaftState, cast, _Command, AuxState, IntState) ->
-    {no_reply, AuxState, IntState}.
+    end.
diff --git a/src/fabric/src/fabric_doc_update.erl 
b/src/fabric/src/fabric_doc_update.erl
index 2c5d3f4ed..7ba36a99d 100644
--- a/src/fabric/src/fabric_doc_update.erl
+++ b/src/fabric/src/fabric_doc_update.erl
@@ -30,17 +30,18 @@ go(DbName, AllDocs0, Opts) ->
     validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, 
Opts)),
     GroupedDocs = group_docs_by_shard(DbName, AllDocs),
     Acc = #acc{
-             options = lists:delete(all_or_nothing, Opts),
-             timeout = fabric_util:request_timeout(),
-             replies = []
-            },
+        options = lists:delete(all_or_nothing, Opts),
+        timeout = fabric_util:request_timeout(),
+        replies = #{}
+    },
     FoldFun = fun(Shards, Docs, FunAcc) ->
-                      Nodes = [S#shard.node || S <- Shards],
-                      ServerId = chttpd_app:couch_write_queue_name(Nodes),
-                      Ref = start_worker(ServerId, Shards, Docs, 
Acc#acc.options),
-                      [{Shards, Ref} | FunAcc] end,
+        Nodes = [S#shard.node || S <- Shards],
+        ServerId = chttpd_app:couch_write_queue_name(Nodes),
+        Ref = start_worker(ServerId, Shards, Docs, Acc#acc.options),
+        [{Shards, Ref} | FunAcc]
+    end,
     Refs = maps:fold(FoldFun, [], GroupedDocs),
-    recv_loop(GroupedDocs, Refs, Acc).
+    {ok, recv_loop(GroupedDocs, Refs, Acc)}.
 
 recv_loop(_GroupedDocs, _Refs, #acc{} = Acc) when Acc#acc.timeout =< 0 ->
     {error, timeout};
@@ -52,10 +53,18 @@ recv_loop(GroupedDocs, Refs0, #acc{} = Acc0) ->
         {ra_event, _CurrentLeader, {applied, [{Correlation, Reply}]}} ->
             couch_log:warning("received reply for ~p : ~p", [Correlation, 
Reply]),
             Elapsed = ts() - T0,
-            Acc1 = Acc0#acc{
-                     timeout = Acc0#acc.timeout - Elapsed,
-                     replies = [Reply | Acc0#acc.replies]
-                    },
+            {Shards, Correlation} = lists:keyfind(Correlation, 2, Refs0),
+            Acc1 =
+                case Reply of
+                    {ok, Results} ->
+                        Acc0#acc{
+                            timeout = Acc0#acc.timeout - Elapsed,
+                            replies = (Acc0#acc.replies)#{Shards => Results}
+                        };
+                    Else ->
+                        couch_log:error("OOPS ~p", [Else]),
+                        Acc0
+                end,
             recv_loop(GroupedDocs, Refs0, Acc1);
         {ra_event, _FromId, {rejected, {not_leader, undefined, Correlation}}} 
->
             couch_log:warning("received no leader ~p", [Correlation]),
@@ -68,9 +77,8 @@ recv_loop(GroupedDocs, Refs0, #acc{} = Acc0) ->
             #{Shards := Docs} = GroupedDocs,
             Ref = start_worker(Leader, Shards, Docs, Acc0#acc.options),
             recv_loop(GroupedDocs, [{Shards, Ref} | Refs1], Acc1)
-    after
-        Acc0#acc.timeout ->
-            {error, timeout}
+    after Acc0#acc.timeout ->
+        {error, timeout}
     end.
 
 start_worker(Leader, Shards, Docs, Options) ->
@@ -83,7 +91,6 @@ start_worker(Leader, Shards, Docs, Options) ->
 ts() ->
     erlang:monotonic_time(millisecond).
 
-
 before_doc_update(DbName, Docs, Opts) ->
     % Use the same pattern as in couch_db:validate_doc_update/3. If the 
document was already
     % checked during the interactive edit we don't want to spend time in the 
internal replicator
@@ -116,7 +123,7 @@ before_doc_update(DbName, Docs, Opts) ->
 
 group_docs_by_shard(DbName, Docs) ->
     KeyFun = fun(#doc{} = Doc) ->
-                     mem3_shards:for_docid(DbName, Doc#doc.id)
+        mem3_shards:for_docid(DbName, Doc#doc.id)
     end,
     maps:groups_from_list(KeyFun, Docs).
 

Reply via email to