This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch reduce-intra-cluster-conflicts-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit cf2808456e693b35e574f23c3e1b64d1cc12ab58
Author: Robert Newson <[email protected]>
AuthorDate: Wed Feb 18 18:04:10 2026 +0000

    don't update followers where leader got conflicts
---
 TODO                                 |  1 +
 src/fabric/src/fabric_doc_update.erl | 22 ++++++++++++++++------
 2 files changed, 17 insertions(+), 6 deletions(-)

diff --git a/TODO b/TODO
new file mode 100644
index 000000000..cc03fadad
--- /dev/null
+++ b/TODO
@@ -0,0 +1 @@
+remove docs from the followers GroupedDocs before starting them if the leader 
got a conflict
\ No newline at end of file
diff --git a/src/fabric/src/fabric_doc_update.erl 
b/src/fabric/src/fabric_doc_update.erl
index 5aba9d482..0664493e2 100644
--- a/src/fabric/src/fabric_doc_update.erl
+++ b/src/fabric/src/fabric_doc_update.erl
@@ -115,8 +115,10 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
         grouped_docs = GroupedDocs,
         reply = DocReplyDict0
     } = Acc0,
-    Acc1 = start_followers([Worker#shard.range], Acc0),
-    {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs),
+    {value, {_, Docs}, NewGrpDocs0} = lists:keytake(Worker, 1, GroupedDocs),
+    ConflictDocs = [Doc || {Doc, conflict} <- lists:zip(Docs, Replies)],
+    NewGrpDocs = remove_conflicted_docs(ConflictDocs, NewGrpDocs0),
+    Acc2 = start_followers([Worker#shard.range], Acc0#acc{grouped_docs = 
NewGrpDocs}),
     DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
     case {WaitingCount, dict:size(DocReplyDict)} of
         {1, _} ->
@@ -131,7 +133,7 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
             % we've got at least one reply for each document, let's take a look
             case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of
                 continue ->
-                    {ok, Acc1#acc{
+                    {ok, Acc2#acc{
                         waiting_count = WaitingCount - 1,
                         grouped_docs = NewGrpDocs,
                         reply = DocReplyDict
@@ -140,7 +142,7 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
                     {stop, {ok, FinalReplies}}
             end;
         _ ->
-            {ok, Acc1#acc{
+            {ok, Acc2#acc{
                 waiting_count = WaitingCount - 1, grouped_docs = NewGrpDocs, 
reply = DocReplyDict
             }}
     end;
@@ -194,8 +196,11 @@ tag_docs([#doc{meta = Meta} = Doc | Rest]) ->
 
 untag_docs([]) ->
     [];
-untag_docs([#doc{meta = Meta} = Doc | Rest]) ->
-    [Doc#doc{meta = lists:keydelete(ref, 1, Meta)} | untag_docs(Rest)].
+untag_docs([#doc{} = Doc | Rest]) ->
+    [untag_doc(Doc) | untag_docs(Rest)].
+
+untag_doc(#doc{} = Doc) ->
+    Doc#doc{meta = lists:keydelete(ref, 1, Doc#doc.meta)}.
 
 force_reply(Doc, [], {_, W, Acc}) ->
     {error, W, [{Doc, {error, internal_server_error}} | Acc]};
@@ -416,6 +421,11 @@ start_worker(#shard{ref = undefined}, _Docs, #acc{}) ->
     % for unit tests below.
     ok.
 
+%% Remove all remaining doc update attempts if a conflict occurred at leader
+remove_conflicted_docs(ConflictDocs, GroupedDocs) ->
+    UntaggedConflictDocs = untag_docs(ConflictDocs),
+    [{W, [D || D <- Ds, not lists:member(untag_doc(D), UntaggedConflictDocs)]} 
|| {W, Ds} <- GroupedDocs].
+
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").

Reply via email to