This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch reduce-intra-cluster-conflicts-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 754f327bd62aab2a94344aa2622919147806b3f4
Author: Robert Newson <[email protected]>
AuthorDate: Wed Feb 18 14:51:36 2026 +0000

    start leaders first
---
 src/fabric/src/fabric_doc_update.erl | 74 +++++++++++++++++++++++++++++-------
 1 file changed, 61 insertions(+), 13 deletions(-)

diff --git a/src/fabric/src/fabric_doc_update.erl 
b/src/fabric/src/fabric_doc_update.erl
index 1fac344ef..5aba9d482 100644
--- a/src/fabric/src/fabric_doc_update.erl
+++ b/src/fabric/src/fabric_doc_update.erl
@@ -24,7 +24,9 @@
     grouped_docs,
     reply,
     dbname,
-    update_options
+    update_options,
+    leaders = [],
+    started = []
 }).
 
 go(_, [], _) ->
@@ -52,7 +54,7 @@ go(DbName, AllDocs0, Opts) ->
         update_options = Options
     },
     Timeout = fabric_util:request_timeout(),
-    Acc1 = start_workers(Acc0),
+    Acc1 = start_leaders(Acc0),
     try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc1, 
infinity, Timeout) of
         {ok, {Health, Results}} when
             Health =:= ok; Health =:= accepted; Health =:= error
@@ -76,22 +78,33 @@ go(DbName, AllDocs0, Opts) ->
 
 handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker, #acc{} = Acc0) ->
     #acc{grouped_docs = GroupedDocs} = Acc0,
-    NewGrpDocs = [X || {#shard{node = N}, _} = X <- GroupedDocs, N =/= 
NodeRef],
-    skip_message(Acc0#acc{waiting_count = length(NewGrpDocs), grouped_docs = 
NewGrpDocs});
+    {NewGrpDocs, DroppedGrpDocs} = lists:partition(
+        fun({#shard{node = N}, _}) -> N =/= NodeRef end, GroupedDocs
+    ),
+    DroppedRanges = lists:usort([S#shard.range || {S, _} <- DroppedGrpDocs]),
+    Acc1 = Acc0#acc{waiting_count = length(NewGrpDocs), grouped_docs = 
NewGrpDocs},
+    Acc2 = start_followers(DroppedRanges, Acc1),
+    skip_message(Acc2);
 handle_message({rexi_EXIT, _}, Worker, #acc{} = Acc0) ->
     #acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0,
     NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs),
-    skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs});
+    Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs},
+    Acc2 = start_followers([Worker#shard.range], Acc1),
+    skip_message(Acc2);
 handle_message({error, all_dbs_active}, Worker, #acc{} = Acc0) ->
     % treat it like rexi_EXIT, the hope at least one copy will return 
successfully
     #acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0,
     NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs),
-    skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs});
+    Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs},
+    Acc2 = start_followers([Worker#shard.range], Acc1),
+    skip_message(Acc2);
 handle_message(internal_server_error, Worker, #acc{} = Acc0) ->
     % happens when we fail to load validation functions in an RPC worker
     #acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0,
     NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs),
-    skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs});
+    Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs},
+    Acc2 = start_followers([Worker#shard.range], Acc1),
+    skip_message(Acc2);
 handle_message(attachment_chunk_received, _Worker, #acc{} = Acc0) ->
     {ok, Acc0};
 handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
@@ -102,6 +115,7 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
         grouped_docs = GroupedDocs,
         reply = DocReplyDict0
     } = Acc0,
+    Acc1 = start_followers([Worker#shard.range], Acc0),
     {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs),
     DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
     case {WaitingCount, dict:size(DocReplyDict)} of
@@ -117,7 +131,7 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
             % we've got at least one reply for each document, let's take a look
             case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of
                 continue ->
-                    {ok, Acc0#acc{
+                    {ok, Acc1#acc{
                         waiting_count = WaitingCount - 1,
                         grouped_docs = NewGrpDocs,
                         reply = DocReplyDict
@@ -126,7 +140,7 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
                     {stop, {ok, FinalReplies}}
             end;
         _ ->
-            {ok, Acc0#acc{
+            {ok, Acc1#acc{
                 waiting_count = WaitingCount - 1, grouped_docs = NewGrpDocs, 
reply = DocReplyDict
             }}
     end;
@@ -353,19 +367,53 @@ validate_atomic_update(_DbName, AllDocs, true) ->
     ),
     throw({aborted, PreCommitFailures}).
 
-start_workers(#acc{} = Acc) ->
+start_leaders(#acc{} = Acc0) ->
+    #acc{dbname = DbName, grouped_docs = GroupedDocs} = Acc0,
+    {Workers, _} = lists:unzip(GroupedDocs),
+    LeaderRefs = lists:foldl(
+        fun({Worker, Docs}, RefAcc) ->
+            case is_leader(DbName, Worker, Workers) of
+                true ->
+                    start_worker(Worker, Docs, Acc0),
+                    [Worker#shard.ref | RefAcc];
+                false ->
+                    RefAcc
+            end
+        end,
+        [],
+        GroupedDocs
+    ),
+    Acc0#acc{leaders = LeaderRefs, started = LeaderRefs}.
+
+start_followers(Ranges, #acc{} = Acc0) ->
+    Followers = [
+        {Worker, Docs}
+     || {Worker, Docs} <- Acc0#acc.grouped_docs,
+        lists:member(Worker#shard.range, Ranges),
+        not lists:member(Worker#shard.ref, Acc0#acc.started)
+    ],
     lists:foreach(
         fun({Worker, Docs}) ->
-            start_worker(Worker, Docs, Acc)
+            start_worker(Worker, Docs, Acc0)
         end,
-        Acc#acc.grouped_docs
+        Followers
     ),
-    Acc.
+    Started = [Ref || {#shard{ref = Ref}, _Docs} <- Followers],
+    Acc0#acc{started = lists:append([Started, Acc0#acc.started])}.
+
+%% use 'lowest' node that hosts this shard range as leader
+is_leader(DbName, Worker, Workers) ->
+    Nodes0 = lists:sort([W#shard.node || W <- Workers, W#shard.range == 
Worker#shard.range]),
+    Nodes1 = mem3_util:rotate_list(DbName, Nodes0),
+    Worker#shard.node == hd(Nodes1).
 
 start_worker(#shard{ref = Ref} = Worker, Docs, #acc{} = Acc) when 
is_reference(Ref) ->
     #shard{name = Name, node = Node} = Worker,
     #acc{update_options = UpdateOptions} = Acc,
     rexi:cast_ref(Ref, Node, {fabric_rpc, update_docs, [Name, 
untag_docs(Docs), UpdateOptions]}),
+    ok;
+start_worker(#shard{ref = undefined}, _Docs, #acc{}) ->
+    % for unit tests below.
     ok.
 
 -ifdef(TEST).

Reply via email to