This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch raft_storemodule in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 725b2cdc1bff165ec4cfb4512440edaee575a470 Author: Robert Newson <[email protected]> AuthorDate: Wed Aug 17 21:57:53 2022 +0100 separate follower and candidate timeouts From; ARC: Analysis of Raft Consensus - 4.2 "As the authors use the same timer range for candidates and followers, in Figure 4.1 we are waiting a minimum of 150ms (and up to twice that) before restarting an election, despite the fact that, on average, a node receives all of its responses within 15ms" We separate the timeouts and set the candidate timeout smaller than the follower timeout. In a contested election (where multiple candidates each gain a minority of votes) we should elect a leader faster than otherwise. --- src/couch/src/couch_raft.erl | 41 +++++++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl index 06025784e..98fb6f926 100644 --- a/src/couch/src/couch_raft.erl +++ b/src/couch/src/couch_raft.erl @@ -15,9 +15,6 @@ -module(couch_raft). -behaviour(gen_statem). --define(ELECTION_DELAY, 150). --define(ELECTION_SPLAY, 150). --define(LEADER_HEARTBEAT, 75). -define(CLIENT_TIMEOUT, 5_000). % maximum number of entries to send in one go. @@ -78,12 +75,12 @@ handle_event(enter, _OldState, follower, Data) -> couch_log:notice("~p became follower in term ~B", [node(), Term]), Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)], persist({keep_state, maps:without([nextIndex, matchIndex], Data#{votedFor => undefined, votesGranted => undefined, froms => #{}}), - [restart_election_timeout() | Replies]}); + [state_timeout(follower) | Replies]}); handle_event(enter, _OldState, candidate, Data) -> #{term := Term} = Data, couch_log:notice("~p became candidate in term ~B", [node(), Term]), - persist({keep_state, start_election(Data), restart_election_timeout()}); + persist({keep_state, start_election(Data), state_timeout(candidate)}); handle_event(enter, _OldState, leader, Data) -> #{store_module := StoreModule, cohort := Cohort, term := Term} = Data, @@ -93,9 +90,9 @@ handle_event(enter, _OldState, leader, Data) -> {keep_state, Data#{ nextIndex => maps:from_list([{Peer, LastIndex + 1} || Peer <- Peers]), matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]) - }, restart_heartbeat_timeout()}; + }, state_timeout(leader)}; -handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State, #{term := CurrentTerm} = Data) +handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, State, #{term := CurrentTerm} = Data) when Term =< CurrentTerm -> #{ source := MSource, @@ -119,9 +116,9 @@ handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State, cast(MSource, Reply, Data), if Grant -> - persist({keep_state, Data#{votedFor => MSource}, restart_election_timeout()}); + persist({keep_state, Data#{votedFor => MSource}, state_timeout(State)}); true -> - {keep_state_and_data, restart_election_timeout()} + {keep_state_and_data, state_timeout(State)} end; handle_event(cast, #{type := 'RequestVoteResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm -> @@ -171,7 +168,7 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State, State == leader -> keep_state_and_data; true -> - {keep_state_and_data, restart_election_timeout()} + {keep_state_and_data, state_timeout(State)} end; Term == CurrentTerm andalso State == candidate -> {next_state, follower, Data, {next_event, cast, Msg}}; @@ -187,7 +184,7 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State, }, couch_log:debug("~p received heartbeat and everything matches, sending matchIndex:~p", [node(), MPrevLogIndex]), cast(MSource, Reply, Data), - {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()}; + {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), state_timeout(State)}; true -> Index = MPrevLogIndex + 1, if @@ -205,12 +202,12 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State, }, couch_log:notice("~p received entry:~p that's already applied, sending matchIndex:~p", [node(), MEntries, MPrevLogIndex + length(MEntries)]), cast(MSource, Reply, Data), - {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()}; + {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), state_timeout(State)}; NthLogTerm /= FirstEntryTerm -> couch_log:notice("~p received conflicting entry:~p, deleting it", [node(), MEntries]), case StoreModule:truncate(LastIndex - 1, Data) of {ok, NewData} -> - {keep_state, NewData, [{next_event, cast, Msg}, restart_election_timeout()]}; + {keep_state, NewData, [{next_event, cast, Msg}, state_timeout(State)]}; {error, Reason} -> {stop, Reason} end @@ -219,7 +216,7 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State, couch_log:notice("~p received new entries:~p, appending it to log", [node(), MEntries]), case StoreModule:append(MEntries, Data) of {ok, _EntryIndex, NewData} -> - {keep_state, NewData, [{next_event, cast, Msg}, restart_election_timeout()]}; + {keep_state, NewData, [{next_event, cast, Msg}, state_timeout(State)]}; {error, Reason} -> {stop, Reason} end @@ -268,13 +265,13 @@ handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) -> handle_event(state_timeout, new_election, State, Data) when State == follower; State == candidate -> #{term := Term} = Data, couch_log:notice("~p election timeout in state ~p, term ~B", [node(), State, Term]), - persist({next_state, candidate, start_election(Data), restart_election_timeout()}); + persist({next_state, candidate, start_election(Data), state_timeout(State)}); handle_event(state_timeout, heartbeat, leader, Data) -> #{term := Term} = Data, couch_log:debug("~p leader sending a heartbeat in term ~B", [node(), Term]), ok = send_append_entries(Data), - {keep_state, advance_commit_index(Data), restart_heartbeat_timeout()}; + {keep_state, advance_commit_index(Data), state_timeout(leader)}; handle_event(EventType, EventContent, State, Data) -> {stop, {unknown_event, EventType, EventContent, State, Data}}. @@ -360,11 +357,15 @@ start_election(Data) -> cast(Node, Msg, #{name := Name}) -> gen_statem:cast({Name, Node}, Msg). -restart_election_timeout() -> - {state_timeout, ?ELECTION_DELAY + rand:uniform(?ELECTION_SPLAY), new_election}. -restart_heartbeat_timeout() -> - {state_timeout, ?LEADER_HEARTBEAT, heartbeat}. +state_timeout(follower) -> + {state_timeout, 150 + rand:uniform(150), new_election}; + +state_timeout(candidate) -> + {state_timeout, 15 + rand:uniform(15), new_election}; + +state_timeout(leader) -> + {state_timeout, 75, heartbeat}. peers(Cohort) -> Cohort -- [node()].
