This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch raft_storemodule in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 5be100df24446dff44cb7eeeaa31f89eb0a9db75 Author: Robert Newson <[email protected]> AuthorDate: Sat May 14 20:28:05 2022 +0100 Integrate raft algorithm (WIP) couch_raft.erl is a complete implementation of the raft algorithm but currently only manages an in-memory state machine and log. Preliminary work is also here to add a new btree inside the `.couch` files, which will be the real raft log. The intent is that log entries can be removed from this log and applied to by_id and by_seq trees atomically. raft log is not preserved over compaction yet because reading the compactor code hurts my eyes. Anyway, it's progress and hopefully we're going somewhere cool. --- src/couch/src/couch_bt_engine.erl | 95 +++++++- src/couch/src/couch_bt_engine.hrl | 3 +- src/couch/src/couch_bt_engine_header.erl | 7 +- src/couch/src/couch_db.erl | 19 ++ src/couch/src/couch_db_engine.erl | 27 +++ src/couch/src/couch_db_updater.erl | 10 + src/couch/src/couch_raft.erl | 350 ++++++++++++++++++++++++++++++ src/couch/src/couch_raft_log.erl | 52 +++++ src/couch/test/eunit/couch_raft_SUITE.erl | 67 ++++++ 9 files changed, 621 insertions(+), 9 deletions(-) diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl index 0549de566..8c1a2756d 100644 --- a/src/couch/src/couch_bt_engine.erl +++ b/src/couch/src/couch_bt_engine.erl @@ -66,6 +66,11 @@ purge_docs/3, copy_purge_infos/2, + raft_lookup/2, + raft_insert/2, + raft_discard/2, + raft_last/1, + commit_data/1, open_write_stream/2, @@ -102,7 +107,11 @@ purge_tree_join/2, purge_tree_reduce/2, purge_seq_tree_split/1, - purge_seq_tree_join/2 + purge_seq_tree_join/2, + + raft_tree_split/1, + raft_tree_join/2, + raft_tree_reduce/2 ]). % Used by the compactor @@ -631,6 +640,44 @@ count_changes_since(St, SinceSeq) -> {ok, Changes} = couch_btree:fold_reduce(BTree, FoldFun, 0, Opts), Changes. +raft_insert(#st{} = St, Entries) when is_list(Entries) -> + #st{ + raft_tree = RaftTree0 + } = St, + {ok, RaftTree1} = couch_btree:add_remove(RaftTree0, Entries, []), + {ok, St#st{ + raft_tree = RaftTree1, + needs_commit = true + }}. + +raft_lookup(#st{} = St, Indexes) -> + Results = couch_btree:lookup(St#st.raft_tree, Indexes), + lists:map( + fun + ({ok, Entry}) -> Entry; + (not_found) -> not_found + end, + Results + ). + +raft_discard(#st{} = St, UpTo) -> + #st{ + raft_tree = RaftTree0 + } = St, + {ok, {First, _Last}} = couch_btree:full_reduce(RaftTree0), + {FirstIndex, _FirstTerm} = First, + Remove = lists:seq(FirstIndex, UpTo), + {ok, RaftTree1} = couch_btree:add_remove(RaftTree0, [], Remove), + {ok, St#st{ + raft_tree = RaftTree1, + needs_commit = true + }}. + + +raft_last(#st{} = St) -> + {ok, {_First, Last}} = couch_btree:full_reduce(St#st.raft_tree), + Last. + start_compaction(St, DbName, Options, Parent) -> Args = [St, DbName, Options, Parent], Pid = spawn_link(couch_bt_engine_compactor, start, Args), @@ -799,6 +846,23 @@ purge_tree_reduce(reduce, IdRevs) -> purge_tree_reduce(rereduce, Reds) -> lists:sum(Reds). +raft_tree_split({Index, Term, Value}) -> + {Index, {Term, Value}}. + +raft_tree_join(Index, {Term, Value}) -> + {Index, Term, Value}. + + +raft_tree_reduce(reduce, []) -> + {{0, 0}, {0, 0}}; +raft_tree_reduce(reduce, Entries) -> + {MinIndex, MinTerm, _} = lists:min(Entries), + {MaxIndex, MaxTerm, _} = lists:max(Entries), + {{MinIndex, MinTerm}, {MaxIndex, MaxTerm}}; +raft_tree_reduce(rereduce, Reds) -> + {Mins, Maxs} = lists:unzip(Reds), + {lists:min(Mins), lists:max(Maxs)}. + set_update_seq(#st{header = Header} = St, UpdateSeq) -> {ok, St#st{ header = couch_bt_engine_header:set(Header, [ @@ -894,6 +958,13 @@ init_state(FilePath, Fd, Header0, Options) -> {reduce, fun ?MODULE:purge_tree_reduce/2} ]), + RaftTreeState = couch_bt_engine_header:raft_tree_state(Header), + {ok, RaftTree} = couch_btree:open(RaftTreeState, Fd, [ + {split, fun ?MODULE:raft_tree_split/1}, + {join, fun ?MODULE:raft_tree_join/2}, + {reduce, fun ?MODULE:raft_tree_reduce/2} + ]), + ok = couch_file:set_db_pid(Fd, self()), St = #st{ @@ -907,7 +978,8 @@ init_state(FilePath, Fd, Header0, Options) -> local_tree = LocalTree, compression = Compression, purge_tree = PurgeTree, - purge_seq_tree = PurgeSeqTree + purge_seq_tree = PurgeSeqTree, + raft_tree = RaftTree }, % If this is a new database we've just created a @@ -927,7 +999,8 @@ update_header(St, Header) -> {id_tree_state, couch_btree:get_state(St#st.id_tree)}, {local_tree_state, couch_btree:get_state(St#st.local_tree)}, {purge_tree_state, couch_btree:get_state(St#st.purge_tree)}, - {purge_seq_tree_state, couch_btree:get_state(St#st.purge_seq_tree)} + {purge_seq_tree_state, couch_btree:get_state(St#st.purge_seq_tree)}, + {raft_tree_state, couch_btree:get_state(St#st.raft_tree)} ]). increment_update_seq(#st{header = Header} = St) -> @@ -1097,7 +1170,8 @@ active_size(#st{} = St, #size_info{} = SI) -> St#st.seq_tree, St#st.local_tree, St#st.purge_tree, - St#st.purge_seq_tree + St#st.purge_seq_tree, + St#st.raft_tree ], lists:foldl( fun(T, Acc) -> @@ -1171,12 +1245,14 @@ fold_docs_reduce_to_count(Reds) -> finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) -> #st{ filepath = FilePath, - local_tree = OldLocal + local_tree = OldLocal, + raft_tree = OldRaft } = OldSt, #st{ filepath = CompactDataPath, header = Header, - local_tree = NewLocal1 + local_tree = NewLocal1, + raft_tree = NewRaft1 } = NewSt1, % suck up all the local docs into memory and write them to the new db @@ -1186,13 +1262,18 @@ finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) -> {ok, _, LocalDocs} = couch_btree:foldl(OldLocal, LoadFun, []), {ok, NewLocal2} = couch_btree:add(NewLocal1, LocalDocs), + % do the same for the raft log + {ok, _, RaftLog} = couch_btree:foldl(OldRaft, LoadFun, []), + {ok, NewRaft2} = couch_btree:add(NewRaft1, RaftLog), + {ok, NewSt2} = commit_data(NewSt1#st{ header = couch_bt_engine_header:set(Header, [ {compacted_seq, get_update_seq(OldSt)}, {revs_limit, get_revs_limit(OldSt)}, {purge_infos_limit, get_purge_infos_limit(OldSt)} ]), - local_tree = NewLocal2 + local_tree = NewLocal2, + raft_tree = NewRaft2 }), % Rename our *.compact.data file to *.compact so that if we diff --git a/src/couch/src/couch_bt_engine.hrl b/src/couch/src/couch_bt_engine.hrl index e3c1d4983..0d347e99b 100644 --- a/src/couch/src/couch_bt_engine.hrl +++ b/src/couch/src/couch_bt_engine.hrl @@ -23,5 +23,6 @@ local_tree, compression, purge_tree, - purge_seq_tree + purge_seq_tree, + raft_tree }). diff --git a/src/couch/src/couch_bt_engine_header.erl b/src/couch/src/couch_bt_engine_header.erl index e28f07723..9e663b096 100644 --- a/src/couch/src/couch_bt_engine_header.erl +++ b/src/couch/src/couch_bt_engine_header.erl @@ -34,6 +34,7 @@ purge_tree_state/1, purge_seq_tree_state/1, purge_infos_limit/1, + raft_tree_state/1, security_ptr/1, revs_limit/1, uuid/1, @@ -69,7 +70,8 @@ epochs, compacted_seq, purge_infos_limit = 1000, - props_ptr + props_ptr, + raft_tree_state = nil }). -define(PARTITION_DISK_VERSION, 8). @@ -177,6 +179,9 @@ compacted_seq(Header) -> purge_infos_limit(Header) -> get_field(Header, purge_infos_limit). +raft_tree_state(Header) -> + get_field(Header, raft_tree_state). + get_field(Header, Field) -> get_field(Header, Field, undefined). diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index dd7e07517..e197f98a4 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -114,6 +114,11 @@ fold_purge_infos/4, fold_purge_infos/5, + raft_insert/2, + raft_lookup/2, + raft_discard/2, + raft_last/1, + calculate_start_seq/3, owner_of/2, @@ -1822,6 +1827,20 @@ fold_purge_infos(Db, StartPurgeSeq, Fun, Acc) -> fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts) -> couch_db_engine:fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts). +raft_insert(#db{main_pid = Pid} = Db, Entries) -> + check_is_admin(Db), + gen_server:call(Pid, {raft_insert, Entries}, infinity). + +raft_lookup(Db, Indexes) -> + couch_db_engine:raft_lookup(Db, Indexes). + +raft_discard(#db{main_pid = Pid} = Db, UpTo) -> + check_is_admin(Db), + gen_server:call(Pid, {raft_discard, UpTo}, infinity). + +raft_last(Db) -> + couch_db_engine:raft_last(Db). + count_changes_since(Db, SinceSeq) -> couch_db_engine:count_changes_since(Db, SinceSeq). diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl index 052a527e3..63b9d49a3 100644 --- a/src/couch/src/couch_db_engine.erl +++ b/src/couch/src/couch_db_engine.erl @@ -704,6 +704,11 @@ read_doc_body/2, load_purge_infos/2, + raft_lookup/2, + raft_insert/2, + raft_discard/2, + raft_last/1, + serialize_doc/2, write_doc_body/2, write_doc_infos/3, @@ -927,6 +932,28 @@ copy_purge_infos(#db{} = Db, Purges) -> ), {ok, Db#db{engine = {Engine, NewSt}}}. +raft_insert(#db{} = Db, Entries) -> + #db{engine = {Engine, EngineState}} = Db, + {ok, NewSt} = Engine:raft_insert( + EngineState, Entries + ), + {ok, Db#db{engine = {Engine, NewSt}}}. + +raft_lookup(#db{} = Db, Indexes) -> + #db{engine = {Engine, EngineState}} = Db, + Engine:raft_lookup(EngineState, Indexes). + +raft_discard(#db{} = Db, UpTo) -> + #db{engine = {Engine, EngineState}} = Db, + {ok, NewSt} = Engine:raft_discard( + EngineState, UpTo + ), + {ok, Db#db{engine = {Engine, NewSt}}}. + +raft_last(#db{} = Db) -> + #db{engine = {Engine, EngineState}} = Db, + Engine:raft_last(EngineState). + commit_data(#db{} = Db) -> #db{engine = {Engine, EngineState}} = Db, {ok, NewSt} = Engine:commit_data(EngineState), diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl index 0248c21ec..7c1f97804 100644 --- a/src/couch/src/couch_db_updater.erl +++ b/src/couch/src/couch_db_updater.erl @@ -118,6 +118,16 @@ handle_call({purge_docs, PurgeReqs0, Options}, _From, Db) -> end, {ok, NewDb, Replies} = purge_docs(Db, PurgeReqs), {reply, {ok, Replies}, NewDb, idle_limit()}; +handle_call({raft_insert, Entries}, _From, Db) -> + {ok, Db2} = couch_db_engine:raft_insert(Db, Entries), + Db3 = commit_data(Db2), + ok = couch_server:db_updated(Db3), + {reply, ok, Db3, idle_limit()}; +handle_call({raft_discard, UpTo}, _From, Db) -> + {ok, Db2} = couch_db_engine:raft_discard(Db, UpTo), + Db3 = commit_data(Db2), + ok = couch_server:db_updated(Db3), + {reply, ok, Db3, idle_limit()}; handle_call(Msg, From, Db) -> case couch_db_engine:handle_db_updater_call(Msg, From, Db) of {reply, Resp, NewDb} -> diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl new file mode 100644 index 000000000..f398b4f2a --- /dev/null +++ b/src/couch/src/couch_raft.erl @@ -0,0 +1,350 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +-module(couch_raft). +-behaviour(gen_statem). + +-define(ELECTION_DELAY, 150). +-define(ELECTION_SPLAY, 150). +-define(LEADER_HEARTBEAT, 75). +-define(CLIENT_TIMEOUT, 5_000). + +% maximum number of entries to send in one go. +-define(BATCH_SIZE, 10). + +% public api + +-export([ + start/2, + start_link/2, + stop/1, + call/2 +]). + +% mandatory gen_statem callbacks + +-export([ + init/1, + callback_mode/0, + handle_event/4 +]). + +%% public api + +start(Name, Cohort) -> + gen_statem:start({local, Name}, ?MODULE, new(Name, Cohort), []). + +start_link(Name, Cohort) -> + gen_statem:start_link({local, Name}, ?MODULE, new(Name, Cohort), []). + +new(Name, Cohort) -> + Peers = peers(Cohort), + #{ + name => Name, + cohort => Cohort, + term => 0, + votedFor => undefined, + votesGranted => #{}, + nextIndex => maps:from_list([{Peer, 1} || Peer <- Peers]), + matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]), + log => couch_raft_log:new(), + commitIndex => 0, + froms => #{}, + lastApplied => 0, + machine => <<0>> + }. + +stop(ServerRef) -> + gen_statem:stop(ServerRef). + +call(ServerRef, Value) -> + gen_statem:call(ServerRef, #{type => 'ClientRequest', value => Value}, ?CLIENT_TIMEOUT). + +init(Data) -> + {ok, follower, Data}. + +callback_mode() -> + [handle_event_function, state_enter]. + +%% erlfmt-ignore +handle_event(cast, #{term := FutureTerm} = Msg, _State, #{term := CurrentTerm} = Data) when FutureTerm > CurrentTerm -> + couch_log:notice("~p received message from future term ~B, moving to that term, becoming follower and clearing votedFor", [node(), FutureTerm]), + {next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}}; + +handle_event(enter, _OldState, follower, Data) -> + #{term := Term, froms := Froms} = Data, + couch_log:notice("~p became follower in term ~B", [node(), Term]), + Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)], + {keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]}; + +handle_event(enter, _OldState, candidate, Data) -> + #{term := Term} = Data, + couch_log:notice("~p became candidate in term ~B", [node(), Term]), + {keep_state, start_election(Data), restart_election_timeout()}; + +handle_event(enter, _OldState, leader, Data) -> + #{log := Log, cohort := Cohort, term := Term} = Data, + couch_log:notice("~p became leader in term ~B", [node(), Term]), + Peers = peers(Cohort), + {keep_state, Data#{ + nextIndex => maps:from_list([{Peer, couch_raft_log:index(couch_raft_log:last(Log)) + 1} || Peer <- Peers]), + matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]) + }, restart_heartbeat_timeout()}; + +handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State, #{term := CurrentTerm} = Data) + when Term =< CurrentTerm -> + #{ + source := MSource, + lastLogIndex := MLastLogIndex, + lastLogTerm := MLastLogTerm + } = Msg, + #{ + log := Log, + votedFor := VotedFor + } = Data, + LogOk = MLastLogTerm > couch_raft_log:term(couch_raft_log:last(Log)) orelse (MLastLogTerm == couch_raft_log:term(couch_raft_log:last(Log)) andalso MLastLogIndex >= couch_raft_log:index(couch_raft_log:last(Log))), + Grant = Term == CurrentTerm andalso LogOk andalso (VotedFor == undefined orelse VotedFor == MSource), + couch_log:notice("~p received RequestVoteRequest from ~p in term ~B when in term ~B (Grant:~p, LogOk:~p, VotedFor:~p)", [node(), MSource, Term, CurrentTerm, Grant, LogOk, VotedFor]), + Reply = #{ + type => 'RequestVoteResponse', + term => CurrentTerm, + voteGranted => Grant, + source => node() + }, + cast(MSource, Reply, Data), + if + Grant -> + {keep_state, Data#{votedFor => MSource}, restart_election_timeout()}; + true -> + {keep_state_and_data, restart_election_timeout()} + end; + +handle_event(cast, #{type := 'RequestVoteResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm -> + couch_log:notice("~p ignored RequestVoteResponse from past term ~B", [node(), PastTerm]), + keep_state_and_data; + +handle_event(cast, #{type := 'RequestVoteResponse', term := Term} = Msg, _State, #{term := Term} = Data) -> + #{source := MSource, voteGranted := MVoteGranted} = Msg, + #{cohort := Cohort, votesGranted := VotesGranted0} = Data, + VotesGranted1 = if MVoteGranted -> lists:usort([MSource | VotesGranted0]); true -> VotesGranted0 end, + couch_log:notice("~p received RequestVoteResponse from ~p in current term ~B (VotesGranted:~p)", [node(), MSource, Term, VotesGranted1]), + if + length(VotesGranted1) >= length(Cohort) div 2 + 1 -> + couch_log:notice("~p has enough votes to be leader in term ~B", [node(), Term]), + {next_state, leader, Data#{votesGranted => VotesGranted1}}; + true -> + {keep_state, Data#{votesGranted => VotesGranted1}} + end; + + +handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State, #{term := CurrentTerm} = Data) + when Term =< CurrentTerm -> + #{ + source := MSource, + prevLogIndex := MPrevLogIndex, + prevLogTerm := MPrevLogTerm, + entries := MEntries, + commitIndex := MCommitIndex + } = Msg, + #{ + log := Log + } = Data, + LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< couch_raft_log:index(couch_raft_log:last(Log)) andalso MPrevLogTerm == couch_raft_log:term(couch_raft_log:nth(MPrevLogIndex,Log))), + if + Term < CurrentTerm orelse (Term == CurrentTerm andalso State == follower andalso not LogOk) -> + Reply = #{ + type => 'AppendEntriesResponse', + term => CurrentTerm, + success => false, + matchIndex => 0, + source => node() + }, + cast(MSource, Reply, Data), + if + State == leader -> + keep_state_and_data; + true -> + {keep_state_and_data, restart_election_timeout()} + end; + Term == CurrentTerm andalso State == candidate -> + {next_state, follower, Data, {next_event, cast, Msg}}; + Term == CurrentTerm andalso State == follower andalso LogOk -> + if + MEntries == [] -> + Reply = #{ + type => 'AppendEntriesResponse', + term => CurrentTerm, + success => true, + matchIndex => MPrevLogIndex, + source => node() + }, + couch_log:debug("~p received heartbeat and everything matches, sending matchIndex:~p", [node(), MPrevLogIndex]), + cast(MSource, Reply, Data), + {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()}; + true -> + Index = MPrevLogIndex + 1, + LastLogIndex = couch_raft_log:index(couch_raft_log:last(Log)), + if + LastLogIndex >= Index -> + NthLogTerm = couch_raft_log:term(couch_raft_log:nth(Index, Log)), + FirstEntryTerm = couch_raft_log:term(hd(MEntries)), + if + NthLogTerm == FirstEntryTerm -> + Reply = #{ + type => 'AppendEntriesResponse', + term => CurrentTerm, + success => true, + matchIndex => MPrevLogIndex + length(MEntries), + source => node() + }, + couch_log:notice("~p received entry:~p that's already applied, sending matchIndex:~p", [node(), MEntries, MPrevLogIndex + length(MEntries)]), + cast(MSource, Reply, Data), + {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()}; + NthLogTerm /= FirstEntryTerm -> + couch_log:notice("~p received conflicting entry:~p, deleting it", [node(), MEntries]), + {keep_state, Data#{log => lists:sublist(Log, LastLogIndex - 1)}, [{next_event, cast, Msg}, restart_election_timeout()]} + end; + LastLogIndex == MPrevLogIndex -> + couch_log:notice("~p received new entries:~p, appending it to log", [node(), MEntries]), + {keep_state, Data#{log => couch_raft_log:append(Log, MEntries)}, [{next_event, cast, Msg}, restart_election_timeout()]} + end + end + end; + +handle_event(cast, #{type := 'AppendEntriesResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm -> + couch_log:notice("~p ignored AppendEntriesResponse from past term ~B", [node(), PastTerm]), + keep_state_and_data; + +handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _State, #{term := Term} = Data) -> + #{success := MSuccess, matchIndex := MMatchIndex, source := MSource} = Msg, + #{nextIndex := NextIndex, matchIndex := MatchIndex} = Data, + couch_log:debug("~p received AppendEntriesResponse from ~p in current term ~B (Success:~p)", [node(), MSource, Term, MSuccess]), + SourceNextIndex = maps:get(MSource, NextIndex), + if + MSuccess -> + {keep_state, Data#{ + nextIndex => NextIndex#{MSource => MMatchIndex + 1}, + matchIndex => MatchIndex#{MSource => MMatchIndex} + }}; + true -> + {keep_state, Data#{ + nextIndex => NextIndex#{MSource => max(SourceNextIndex - 1, 1)} + }} + end; + +handle_event({call, From}, #{type := 'ClientRequest'} = Msg, leader, Data) -> + #{value := Value} = Msg, + #{term := Term, log := Log, froms := Froms} = Data, + EntryIndex = couch_raft_log:index(couch_raft_log:last(Log)) + 1, + Entry = {EntryIndex, Term, Value}, + {keep_state, Data#{log => couch_raft_log:append(Log, [Entry]), froms => Froms#{EntryIndex => From}}}; + +handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) -> + {keep_state_and_data, {reply, From, {error, not_leader}}}; + +handle_event(state_timeout, new_election, State, Data) when State == follower; State == candidate -> + #{term := Term} = Data, + couch_log:notice("~p election timeout in state ~p, term ~B", [node(), State, Term]), + {next_state, candidate, start_election(Data), restart_election_timeout()}; + +handle_event(state_timeout, heartbeat, leader, Data) -> + #{term := Term} = Data, + couch_log:debug("~p leader sending a heartbeat in term ~B", [node(), Term]), + ok = send_append_entries(Data), + {keep_state, advance_commit_index(Data), restart_heartbeat_timeout()}; + +handle_event(EventType, EventContent, State, Data) -> + {stop, {unknown_event, EventType, EventContent, State, Data}}. + + +send_append_entries(#{cohort := Cohort} = Data) -> + send_append_entries(peers(Cohort), Data). + +send_append_entries([], _Data) -> + ok; +send_append_entries([Peer | Rest], Data) -> + #{term := Term, nextIndex := NextIndex, log := Log, commitIndex := CommitIndex} = Data, + PrevLogIndex = maps:get(Peer, NextIndex) - 1, + PrevLogTerm = if PrevLogIndex > 0 -> couch_raft_log:term(couch_raft_log:nth(PrevLogIndex, Log)); true -> 0 end, + LastEntry = min(couch_raft_log:index(couch_raft_log:last(Log)), PrevLogIndex + 2), + Entries = couch_raft_log:sublist(Log, PrevLogIndex + 1, ?BATCH_SIZE), + Msg = #{ + type => 'AppendEntriesRequest', + term => Term, + source => node(), + prevLogIndex => PrevLogIndex, + prevLogTerm => PrevLogTerm, + entries => Entries, + commitIndex => min(CommitIndex, LastEntry) + }, + cast(Peer, Msg, Data), + send_append_entries(Rest, Data). + +advance_commit_index(Data) -> + #{matchIndex := MatchIndex, log := Log, cohort := Cohort, term := Term} = Data, + LastTerm = couch_raft_log:term(couch_raft_log:last(Log)), + LastIndexes = lists:sort([couch_raft_log:index(couch_raft_log:last(Log)) | maps:values(MatchIndex)]), + NewCommitIndex = lists:nth(length(Cohort) div 2 + 1, LastIndexes), + if + LastTerm == Term -> + update_state_machine(Data#{commitIndex => NewCommitIndex}); + true -> + Data + end. + +update_state_machine(#{lastApplied := Same, commitIndex := Same} = Data) -> + Data; +update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} = Data) when LastApplied < CommitIndex -> + #{log := Log, froms := Froms0, machine := Machine0} = Data, + From = LastApplied + 1, + To = min(couch_raft_log:index(couch_raft_log:last(Log)), CommitIndex), + Fun = fun(Index, {Froms, Machine}) -> + Value = couch_raft_log:value(couch_raft_log:nth(Index, Log)), + Result = crypto:hash(sha256, <<Machine/binary, Value/binary>>), + case maps:is_key(Index, Froms) of + true -> + gen_statem:reply(maps:get(Index, Froms), Result), + {maps:remove(Index, Froms), Result}; + false -> + {Froms, Result} + end + end, + {Froms1, Machine1} = lists:foldl(Fun, {Froms0, Machine0}, lists:seq(From, To)), + Data#{froms => Froms1, machine => Machine1, lastApplied => To}. + +start_election(Data) -> + #{term := Term, cohort := Cohort, log := Log} = Data, + ElectionTerm = Term + 1, + couch_log:notice("~p starting election in term ~B", [node(), ElectionTerm]), + RequestVote = #{ + type => 'RequestVoteRequest', + term => ElectionTerm, + lastLogIndex => couch_raft_log:index(couch_raft_log:last(Log)), + lastLogTerm => couch_raft_log:term(couch_raft_log:last(Log)), + source => node() + }, + lists:foreach(fun(Peer) -> cast(Peer, RequestVote, Data) end, peers(Cohort)), + Data#{term => ElectionTerm, votedFor => node(), votesGranted => [node()]}. + +cast(Node, Msg, #{name := Name}) -> + gen_statem:cast({Name, Node}, Msg). + +restart_election_timeout() -> + {state_timeout, ?ELECTION_DELAY + rand:uniform(?ELECTION_SPLAY), new_election}. + +restart_heartbeat_timeout() -> + {state_timeout, ?LEADER_HEARTBEAT, heartbeat}. + +peers(Cohort) -> + Cohort -- [node()]. diff --git a/src/couch/src/couch_raft_log.erl b/src/couch/src/couch_raft_log.erl new file mode 100644 index 000000000..987212457 --- /dev/null +++ b/src/couch/src/couch_raft_log.erl @@ -0,0 +1,52 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +-module(couch_raft_log). + +-export([ + new/0, + append/2, + sublist/3, + nth/2, + last/1, + index/1, + term/1, + value/1 +]). + +new() -> + []. + +append(Log, Items) -> + lists:append(Log, Items). + +sublist(Log, Start, Len) -> + lists:sublist(Log, Start, Len). + +nth(N, Log) -> + lists:nth(N, Log). + +last([]) -> + {0, 0, undefined}; +last(Log) -> + lists:last(Log). + +index(Entry) -> + element(1, Entry). + +term(Entry) -> + element(2, Entry). + +value(Entry) -> + element(3, Entry). diff --git a/src/couch/test/eunit/couch_raft_SUITE.erl b/src/couch/test/eunit/couch_raft_SUITE.erl new file mode 100644 index 000000000..1c3f8ebc2 --- /dev/null +++ b/src/couch/test/eunit/couch_raft_SUITE.erl @@ -0,0 +1,67 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +-module(couch_raft_SUITE). + +-behaviour(ct_suite). + +-export([all/0]). +-export([three_nodes/1]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> + [three_nodes]. + +three_nodes(Config) when is_list(Config) -> + N = 3, + Args = ["-pa", filename:dirname(code:which(craft))], + Peers = [?CT_PEER(#{wait_boot => {self(), tag}, args => Args}) || _ <- lists:seq(1, N)], + Cohort = [receive {tag, {started, Node, Peer}} -> Node end || {ok, Peer} <- Peers], + + Crafts = [erpc:call(Node, craft3, start, [foo, Cohort]) || Node <- Cohort], + + % wait for leader election + timer:sleep(500), + + % verify only one leader elected + [{leader, FirstLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end, + [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts]), + + % make a series of calls + Hash1 = crypto:hash(sha256, <<0, 1>>), + ?assertEqual(Hash1, craft3:call(FirstLeader, <<1>>)), + + Hash2 = crypto:hash(sha256, <<Hash1/binary, 2>>), + ?assertEqual(Hash2, craft3:call(FirstLeader, <<2>>)), + + Hash3 = crypto:hash(sha256, <<Hash2/binary, 3>>), + ?assertEqual(Hash3, craft3:call(FirstLeader, <<3>>)), + + % force a re-election + craft3:stop(FirstLeader), + timer:sleep(500), + + % verify new leader elected + [{leader, SecondLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end, + [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts, Pid /= FirstLeader]), + ?assertNotEqual(FirstLeader, SecondLeader), + + % make another call + Hash4 = crypto:hash(sha256, <<Hash3/binary, 4>>), + ?assertEqual(Hash4, craft3:call(SecondLeader, <<4>>)), + + [craft3:stop(Pid) || {ok, Pid} <- Crafts, Pid /= FirstLeader], + [peer:stop(Peer) || {ok, Peer} <- Peers].
