This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch ra
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 409becbdf8b15a9860d04f476e7e5752b717a48f
Author: Robert Newson <[email protected]>
AuthorDate: Mon Jan 27 15:48:44 2025 +0000

    command per doc, not optimal but easy to read WIP
---
 src/couch/src/couch_write_queue.erl  |  8 ++--
 src/fabric/src/fabric_doc_update.erl | 83 ++++--------------------------------
 2 files changed, 13 insertions(+), 78 deletions(-)

diff --git a/src/couch/src/couch_write_queue.erl 
b/src/couch/src/couch_write_queue.erl
index 314425192..7ff8c22ae 100644
--- a/src/couch/src/couch_write_queue.erl
+++ b/src/couch/src/couch_write_queue.erl
@@ -24,13 +24,13 @@
 init(_Conf) ->
     nil.
 
-apply(_Meta, {update_docs, DbName, Docs, Options} = Command, State) when
-    is_binary(DbName), is_list(Docs), is_list(Options)
+apply(_Meta, {update_doc, DbName, Doc, Options}, State) when
+    is_binary(DbName), is_record(Doc, doc), is_list(Options)
 ->
     {ok, Db} = couch_db:open(DbName, [{create_if_missing, true}, ?ADMIN_CTX]),
     try
-        Reply = couch_db:update_docs(Db, Docs, Options, ?INTERACTIVE_EDIT),
-        couch_log:notice("apply ~s ~p ~p", [couch_db:name(Db), Docs, Reply]),
+        Reply = couch_db:update_doc(Db, Doc, Options),
+        couch_log:notice("apply ~s ~p ~p", [couch_db:name(Db), Doc, Reply]),
         {State, Reply}
     after
         couch_db:close(Db)
diff --git a/src/fabric/src/fabric_doc_update.erl 
b/src/fabric/src/fabric_doc_update.erl
index 7ba36a99d..76aad2048 100644
--- a/src/fabric/src/fabric_doc_update.erl
+++ b/src/fabric/src/fabric_doc_update.erl
@@ -17,79 +17,20 @@
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
--record(acc, {
-    options,
-    timeout,
-    replies
-}).
-
 go(_, [], _) ->
     {ok, []};
-go(DbName, AllDocs0, Opts) ->
-    AllDocs = before_doc_update(DbName, AllDocs0, Opts),
-    validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, 
Opts)),
-    GroupedDocs = group_docs_by_shard(DbName, AllDocs),
-    Acc = #acc{
-        options = lists:delete(all_or_nothing, Opts),
-        timeout = fabric_util:request_timeout(),
-        replies = #{}
-    },
-    FoldFun = fun(Shards, Docs, FunAcc) ->
+go(DbName, AllDocs0, Options0) ->
+    AllDocs1 = before_doc_update(DbName, AllDocs0, Options0),
+    validate_atomic_update(DbName, AllDocs0, lists:member(all_or_nothing, 
Options0)),
+    Options1 = lists:delete(all_or_nothing, Options0),
+    UpdateFun = fun(Doc) ->
+        [Shard | _] = Shards = mem3_shards:for_docid(DbName, Doc#doc.id),
         Nodes = [S#shard.node || S <- Shards],
         ServerId = chttpd_app:couch_write_queue_name(Nodes),
-        Ref = start_worker(ServerId, Shards, Docs, Acc#acc.options),
-        [{Shards, Ref} | FunAcc]
+        {ok, Reply, _Leader} = ra:process_command(ServerId, {update_doc, 
Shard#shard.name, Doc, Options1}),
+        Reply
     end,
-    Refs = maps:fold(FoldFun, [], GroupedDocs),
-    {ok, recv_loop(GroupedDocs, Refs, Acc)}.
-
-recv_loop(_GroupedDocs, _Refs, #acc{} = Acc) when Acc#acc.timeout =< 0 ->
-    {error, timeout};
-recv_loop(GroupedDocs, _Refs, #acc{} = Acc) when map_size(GroupedDocs) == 
length(Acc#acc.replies) ->
-    {ok, Acc};
-recv_loop(GroupedDocs, Refs0, #acc{} = Acc0) ->
-    T0 = ts(),
-    receive
-        {ra_event, _CurrentLeader, {applied, [{Correlation, Reply}]}} ->
-            couch_log:warning("received reply for ~p : ~p", [Correlation, 
Reply]),
-            Elapsed = ts() - T0,
-            {Shards, Correlation} = lists:keyfind(Correlation, 2, Refs0),
-            Acc1 =
-                case Reply of
-                    {ok, Results} ->
-                        Acc0#acc{
-                            timeout = Acc0#acc.timeout - Elapsed,
-                            replies = (Acc0#acc.replies)#{Shards => Results}
-                        };
-                    Else ->
-                        couch_log:error("OOPS ~p", [Else]),
-                        Acc0
-                end,
-            recv_loop(GroupedDocs, Refs0, Acc1);
-        {ra_event, _FromId, {rejected, {not_leader, undefined, Correlation}}} 
->
-            couch_log:warning("received no leader ~p", [Correlation]),
-            {error, no_leader};
-        {ra_event, _FromId, {rejected, {not_leader, Leader, Correlation}}} ->
-            couch_log:warning("received not leader for ~p", [Correlation]),
-            Elapsed = ts() - T0,
-            Acc1 = Acc0#acc{timeout = Acc0#acc.timeout - Elapsed},
-            {value, {Shards, Correlation}, Refs1} = lists:keytake(Correlation, 
2, Refs0),
-            #{Shards := Docs} = GroupedDocs,
-            Ref = start_worker(Leader, Shards, Docs, Acc0#acc.options),
-            recv_loop(GroupedDocs, [{Shards, Ref} | Refs1], Acc1)
-    after Acc0#acc.timeout ->
-        {error, timeout}
-    end.
-
-start_worker(Leader, Shards, Docs, Options) ->
-    ShardDbName = (hd(Shards))#shard.name,
-    Ref = make_ref(),
-    ok = ra:pipeline_command(Leader, {update_docs, ShardDbName, Docs, 
Options}, Ref, normal),
-    couch_log:warning("started worker for ~s with ref ~p", [ShardDbName, Ref]),
-    Ref.
-
-ts() ->
-    erlang:monotonic_time(millisecond).
+    {ok, lists:map(UpdateFun, AllDocs1)}.
 
 before_doc_update(DbName, Docs, Opts) ->
     % Use the same pattern as in couch_db:validate_doc_update/3. If the 
document was already
@@ -121,12 +62,6 @@ before_doc_update(DbName, Docs, Opts) ->
             Docs
     end.
 
-group_docs_by_shard(DbName, Docs) ->
-    KeyFun = fun(#doc{} = Doc) ->
-        mem3_shards:for_docid(DbName, Doc#doc.id)
-    end,
-    maps:groups_from_list(KeyFun, Docs).
-
 validate_atomic_update(_, _, false) ->
     ok;
 validate_atomic_update(_DbName, AllDocs, true) ->

Reply via email to