This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch reduce-intra-cluster-conflicts-3 in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit cf2808456e693b35e574f23c3e1b64d1cc12ab58 Author: Robert Newson <[email protected]> AuthorDate: Wed Feb 18 18:04:10 2026 +0000 don't update followers where leader got conflicts --- TODO | 1 + src/fabric/src/fabric_doc_update.erl | 22 ++++++++++++++++------ 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/TODO b/TODO new file mode 100644 index 000000000..cc03fadad --- /dev/null +++ b/TODO @@ -0,0 +1 @@ +remove docs from the followers GroupedDocs before starting them if the leader got a conflict \ No newline at end of file diff --git a/src/fabric/src/fabric_doc_update.erl b/src/fabric/src/fabric_doc_update.erl index 5aba9d482..0664493e2 100644 --- a/src/fabric/src/fabric_doc_update.erl +++ b/src/fabric/src/fabric_doc_update.erl @@ -115,8 +115,10 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) -> grouped_docs = GroupedDocs, reply = DocReplyDict0 } = Acc0, - Acc1 = start_followers([Worker#shard.range], Acc0), - {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs), + {value, {_, Docs}, NewGrpDocs0} = lists:keytake(Worker, 1, GroupedDocs), + ConflictDocs = [Doc || {Doc, conflict} <- lists:zip(Docs, Replies)], + NewGrpDocs = remove_conflicted_docs(ConflictDocs, NewGrpDocs0), + Acc2 = start_followers([Worker#shard.range], Acc0#acc{grouped_docs = NewGrpDocs}), DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), case {WaitingCount, dict:size(DocReplyDict)} of {1, _} -> @@ -131,7 +133,7 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) -> % we've got at least one reply for each document, let's take a look case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of continue -> - {ok, Acc1#acc{ + {ok, Acc2#acc{ waiting_count = WaitingCount - 1, grouped_docs = NewGrpDocs, reply = DocReplyDict @@ -140,7 +142,7 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) -> {stop, {ok, FinalReplies}} end; _ -> - {ok, Acc1#acc{ + {ok, Acc2#acc{ waiting_count = WaitingCount - 1, grouped_docs = NewGrpDocs, reply = DocReplyDict }} end; @@ -194,8 +196,11 @@ tag_docs([#doc{meta = Meta} = Doc | Rest]) -> untag_docs([]) -> []; -untag_docs([#doc{meta = Meta} = Doc | Rest]) -> - [Doc#doc{meta = lists:keydelete(ref, 1, Meta)} | untag_docs(Rest)]. +untag_docs([#doc{} = Doc | Rest]) -> + [untag_doc(Doc) | untag_docs(Rest)]. + +untag_doc(#doc{} = Doc) -> + Doc#doc{meta = lists:keydelete(ref, 1, Doc#doc.meta)}. force_reply(Doc, [], {_, W, Acc}) -> {error, W, [{Doc, {error, internal_server_error}} | Acc]}; @@ -416,6 +421,11 @@ start_worker(#shard{ref = undefined}, _Docs, #acc{}) -> % for unit tests below. ok. +%% Remove all remaining doc update attempts if a conflict occurred at leader +remove_conflicted_docs(ConflictDocs, GroupedDocs) -> + UntaggedConflictDocs = untag_docs(ConflictDocs), + [{W, [D || D <- Ds, not lists:member(untag_doc(D), UntaggedConflictDocs)]} || {W, Ds} <- GroupedDocs]. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl").
