Refactor read repair to rely on winning revisions

The old read repair forced an `open_revs=all` call which can lead to
massive resource usage for clients that introduce large numbers of
conflicts. While the logic of opening all revisions to choose a winner
is straight forward it misses the important properties of our winning
revision algorithm.

If we open multiple versions of a doc we can decide the winner directly
without opening the discarded versions because we're using a stable sort
operation for choosing revisions. Given this all we need to worry about
for read repair is that if we return a revision that it exists on a
majority of nodes in case the client tries write immediately after a
read. To accomplish if we find disparate versions we just rewrite them
after the read. We also keep the logic around to write asynchronously if
the quorum reply is a descendant of all other versions.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/9d73e9b5
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/9d73e9b5
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/9d73e9b5

Branch: refs/heads/import
Commit: 9d73e9b5ad2069f20bd9e740bc9a0e14037a658b
Parents: 3d14661
Author: Paul J. Davis <paul.joseph.da...@gmail.com>
Authored: Sun Oct 7 18:54:15 2012 -0500
Committer: Paul J. Davis <paul.joseph.da...@gmail.com>
Committed: Tue Nov 27 14:04:13 2012 -0600

----------------------------------------------------------------------
 src/fabric_doc_open.erl | 484 ++++++++++++++++++++++++++++++++++++-------
 1 file changed, 407 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/9d73e9b5/src/fabric_doc_open.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_open.erl b/src/fabric_doc_open.erl
index 9e466b7..3a7726f 100644
--- a/src/fabric_doc_open.erl
+++ b/src/fabric_doc_open.erl
@@ -20,120 +20,450 @@
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
+
+-record(acc, {
+    dbname,
+    workers,
+    r,
+    state,
+    replies,
+    q_reply
+}).
+
+
 go(DbName, Id, Options) ->
     Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_doc,
         [Id, [deleted|Options]]),
     SuppressDeletedDoc = not lists:member(deleted, Options),
     N = mem3:n(DbName),
     R = couch_util:get_value(r, Options, integer_to_list(mem3:quorum(DbName))),
-    RepairOpts = [{r, integer_to_list(N)} | Options],
-    Acc0 = {Workers, erlang:min(N, list_to_integer(R)), []},
+    Acc0 = #acc{
+        dbname = DbName,
+        workers = Workers,
+        r = erlang:min(N, list_to_integer(R)),
+        state = r_not_met,
+        replies = []
+    },
     RexiMon = fabric_util:create_monitors(Workers),
     try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
-    {ok, Reply} ->
+    {ok, #acc{}=Acc} ->
+        Reply = handle_response(Acc),
         format_reply(Reply, SuppressDeletedDoc);
-    {error, needs_repair, Reply} ->
-        spawn(fabric, open_revs, [DbName, Id, all, RepairOpts]),
-        format_reply(Reply, SuppressDeletedDoc);
-    {error, needs_repair} ->
-        % we couldn't determine the correct reply, so we'll run a sync repair
-        {ok, Results} = fabric:open_revs(DbName, Id, all, RepairOpts),
-        case lists:partition(fun({ok, #doc{deleted=Del}}) -> Del end, Results) 
of
-        {[], []} ->
-            {not_found, missing};
-        {_DeletedDocs, []} when SuppressDeletedDoc ->
-            {not_found, deleted};
-        {DeletedDocs, []} ->
-            lists:last(lists:sort(DeletedDocs));
-        {_, LiveDocs} ->
-            lists:last(lists:sort(LiveDocs))
-        end;
     Error ->
         Error
     after
         rexi_monitor:stop(RexiMon)
     end.
 
-format_reply({ok, #doc{deleted=true}}, true) ->
-    {not_found, deleted};
-format_reply(Else, _) ->
-    Else.
-
-handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, {Workers, R, Replies}) 
->
-    NewWorkers = lists:keydelete(NodeRef, #shard.node, Workers),
+handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) ->
+    NewWorkers = [W || #shard{node=N}=W <- Acc#acc.workers, N /= Node],
     case NewWorkers of
     [] ->
-        {error, needs_repair};
+        {stop, Acc#acc{workers=[]}};
     _ ->
-        {ok, {NewWorkers, R, Replies}}
+        {ok, Acc#acc{workers=NewWorkers}}
     end;
-handle_message({rexi_EXIT, _Reason}, Worker, Acc0) ->
-    skip_message(Worker, Acc0);
-handle_message(Reply, Worker, {Workers, R, Replies}) ->
-    NewReplies = fabric_util:update_counter(Reply, 1, Replies),
-    case lists:dropwhile(fun({_,{_, Count}}) -> Count < R end, NewReplies) of
-    [{_,{QuorumReply, _}} | _] ->
-        fabric_util:cleanup(lists:delete(Worker,Workers)),
-        case {NewReplies, fabric_util:remove_ancestors(NewReplies, [])} of
+handle_message({rexi_EXIT, _Reason}, Worker, Acc) ->
+    NewWorkers = lists:delete(Worker, Acc#acc.workers),
+    case NewWorkers of
+    [] ->
+        {stop, Acc#acc{workers=[]}};
+    _ ->
+        {ok, Acc#acc{workers=NewWorkers}}
+    end;
+handle_message(Reply, Worker, Acc) ->
+    NewReplies = fabric_util:update_counter(Reply, 1, Acc#acc.replies),
+    NewAcc = Acc#acc{replies = NewReplies},
+    case is_r_met(Acc#acc.workers, NewReplies, Acc#acc.r) of
+    {true, QuorumReply} ->
+        fabric_util:cleanup(lists:delete(Worker, Acc#acc.workers)),
+        {stop, NewAcc#acc{workers=[], state=r_met, q_reply=QuorumReply}};
+    wait_for_more ->
+        NewWorkers = lists:delete(Worker, Acc#acc.workers),
+        {ok, NewAcc#acc{workers=NewWorkers}};
+    no_more_workers ->
+        {stop, NewAcc#acc{workers=[]}}
+    end.
+
+handle_response(#acc{state=r_met, replies=Replies, q_reply=QuorumReply}=Acc) ->
+    case {Replies, fabric_util:remove_ancestors(Replies, [])} of
         {[_], [_]} ->
-            % complete agreement amongst all copies
-            {stop, QuorumReply};
+            % Complete agreement amongst all copies
+            QuorumReply;
         {[_|_], [{_, {QuorumReply, _}}]} ->
-            % any divergent replies are ancestors of the QuorumReply
-            {error, needs_repair, QuorumReply};
+            % Any divergent replies are ancestors of the QuorumReply,
+            % repair the document asynchronously
+            spawn(fun() -> read_repair(Acc) end),
+            QuorumReply;
         _Else ->
             % real disagreement amongst the workers, block for the repair
-            {error, needs_repair}
-        end;
+            read_repair(Acc)
+    end;
+handle_response(Acc) ->
+    read_repair(Acc).
+
+is_r_met(Workers, Replies, R) ->
+    case lists:dropwhile(fun({_,{_, Count}}) -> Count < R end, Replies) of
+    [{_,{QuorumReply, _}} | _] ->
+        {true, QuorumReply};
+    [] when length(Workers) > 1 ->
+        wait_for_more;
     [] ->
-        if length(Workers) =:= 1 ->
-            {error, needs_repair};
-        true ->
-            {ok, {lists:delete(Worker,Workers), R, NewReplies}}
+        no_more_workers
+    end.
+
+read_repair(#acc{dbname=DbName, replies=Replies}) ->
+    Docs = [Doc || {_, {{ok, #doc{}=Doc}, _}} <- Replies],
+    case Docs of
+    [#doc{id=Id} | _] ->
+        Ctx = #user_ctx{roles=[<<"_admin">>]},
+        Opts = [replicated_changes, {user_ctx, Ctx}],
+        Res = fabric:update_docs(DbName, Docs, Opts),
+        twig:log(notice, "read_repair ~s ~s ~p", [DbName, Id, Res]),
+        choose_reply(Docs);
+    [] ->
+        % Try hard to return some sort of information
+        % to the client.
+        Values = [V || {_, {V, _}} <- Replies],
+        case lists:member({not_found, missing}, Values) of
+            true ->
+                {not_found, missing};
+            false when length(Values) > 0 ->
+                % Sort for stability in responses in
+                % case we have some weird condition
+                hd(lists:sort(Values));
+            false ->
+                {error, read_failure}
         end
     end.
 
-skip_message(_Worker, {Workers, _R, _Replies}) when length(Workers) =:= 1 ->
-    {error, needs_repair};
-skip_message(Worker, {Workers, R, Replies}) ->
-    {ok, {lists:delete(Worker,Workers), R, Replies}}.
+choose_reply(Docs) ->
+    % Sort descending by {not deleted, rev}. This should match
+    % the logic of couch_doc:to_doc_info/1.
+    [Winner | _] = lists:sort(fun(DocA, DocB) ->
+        InfoA = {not DocA#doc.deleted, DocA#doc.revs},
+        InfoB = {not DocB#doc.deleted, DocB#doc.revs},
+        InfoA > InfoB
+    end, Docs),
+    {ok, Winner}.
+
+format_reply({ok, #doc{deleted=true}}, true) ->
+    {not_found, deleted};
+format_reply(Else, _) ->
+    Else.
+
+
+is_r_met_test() ->
+    Workers0 = [],
+    Workers1 = [nil],
+    Workers2 = [nil,nil],
+
+    % Successful cases
+
+    ?assertEqual(
+        {true, foo},
+        is_r_met([], [fabric_util:kv(foo,2)], 2)
+    ),
+
+    ?assertEqual(
+        {true, foo},
+        is_r_met([], [fabric_util:kv(foo,3)], 2)
+    ),
+
+    ?assertEqual(
+        {true, foo},
+        is_r_met([], [fabric_util:kv(foo,1)], 1)
+    ),
+
+    ?assertEqual(
+        {true, foo},
+        is_r_met([], [fabric_util:kv(foo,2), fabric_util:kv(bar,1)], 2)
+    ),
+
+    ?assertEqual(
+        {true, bar},
+        is_r_met([], [fabric_util:kv(bar,1), fabric_util:kv(bar,2)], 2)
+    ),
+
+    ?assertEqual(
+        {true, bar},
+        is_r_met([], [fabric_util:kv(bar,2), fabric_util:kv(foo,1)], 2)
+    ),
+
+    % Not met, but wait for more messages
+
+    ?assertEqual(
+        wait_for_more,
+        is_r_met(Workers2, [fabric_util:kv(foo,1)], 2)
+    ),
+
+    ?assertEqual(
+        wait_for_more,
+        is_r_met(Workers2, [fabric_util:kv(foo,2)], 3)
+    ),
+
+    ?assertEqual(
+        wait_for_more,
+        is_r_met(Workers2, [fabric_util:kv(foo,1), fabric_util:kv(bar,1)], 2)
+    ),
+
+    % Not met, bail out
 
+    ?assertEqual(
+        no_more_workers,
+        is_r_met(Workers0, [fabric_util:kv(foo,1)], 2)
+    ),
+
+    ?assertEqual(
+        no_more_workers,
+        is_r_met(Workers1, [fabric_util:kv(foo,1)], 2)
+    ),
+
+    ?assertEqual(
+        no_more_workers,
+        is_r_met(Workers1, [fabric_util:kv(foo,1), fabric_util:kv(bar,1)], 2)
+    ),
+
+    ?assertEqual(
+        no_more_workers,
+        is_r_met(Workers1, [fabric_util:kv(foo,2)], 3)
+    ),
+
+    ok.
+
+handle_message_down_test() ->
+    Node0 = 'foo@localhost',
+    Node1 = 'bar@localhost',
+    Down0 = {rexi_DOWN, nil, {nil, Node0}, nil},
+    Down1 = {rexi_DOWN, nil, {nil, Node1}, nil},
+    Workers0 = [#shard{node=Node0} || _ <- [a, b]],
+    Worker1 = #shard{node=Node1},
+    Workers1 = Workers0 ++ [Worker1],
+
+    % Stop when no more workers are left
+    ?assertEqual(
+        {stop, #acc{workers=[]}},
+        handle_message(Down0, nil, #acc{workers=Workers0})
+    ),
+
+    % Continue when we have more workers
+    ?assertEqual(
+        {ok, #acc{workers=[Worker1]}},
+        handle_message(Down0, nil, #acc{workers=Workers1})
+    ),
+
+    % A second DOWN removes the remaining workers
+    ?assertEqual(
+        {stop, #acc{workers=[]}},
+        handle_message(Down1, nil, #acc{workers=[Worker1]})
+    ),
+
+    ok.
+
+handle_message_exit_test() ->
+    Exit = {rexi_EXIT, nil},
+    Worker0 = #shard{ref=erlang:make_ref()},
+    Worker1 = #shard{ref=erlang:make_ref()},
+
+    % Only removes the specified worker
+    ?assertEqual(
+        {ok, #acc{workers=[Worker1]}},
+        handle_message(Exit, Worker0, #acc{workers=[Worker0, Worker1]})
+    ),
+
+    ?assertEqual(
+        {ok, #acc{workers=[Worker0]}},
+        handle_message(Exit, Worker1, #acc{workers=[Worker0, Worker1]})
+    ),
+
+    % We bail if it was the last worker
+    ?assertEqual(
+        {stop, #acc{workers=[]}},
+        handle_message(Exit, Worker0, #acc{workers=[Worker0]})
+    ),
+
+    ok.
+
+handle_message_reply_test() ->
+    start_meck_(),
+    meck:expect(rexi, kill, fun(_, _) -> ok end),
+
+    Worker0 = #shard{ref=erlang:make_ref()},
+    Worker1 = #shard{ref=erlang:make_ref()},
+    Worker2 = #shard{ref=erlang:make_ref()},
+    Workers = [Worker0, Worker1, Worker2],
+    Acc0 = #acc{workers=Workers, r=2, replies=[]},
+
+    % Test that we continue when we haven't met R yet
+    ?assertEqual(
+        {ok, Acc0#acc{
+            workers=[Worker0, Worker1],
+            replies=[fabric_util:kv(foo,1)]
+        }},
+        handle_message(foo, Worker2, Acc0)
+    ),
+
+    ?assertEqual(
+        {ok, Acc0#acc{
+            workers=[Worker0, Worker1],
+            replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)]
+        }},
+        handle_message(bar, Worker2, Acc0#acc{
+            replies=[fabric_util:kv(foo,1)]
+        })
+    ),
+
+    % Test that we don't get a quorum when R isn't met. q_reply
+    % isn't set and state remains unchanged and {stop, NewAcc}
+    % is returned. Bit subtle on the assertions here.
+
+    ?assertEqual(
+        {stop, Acc0#acc{workers=[],replies=[fabric_util:kv(foo,1)]}},
+        handle_message(foo, Worker0, Acc0#acc{workers=[Worker0]})
+    ),
+
+    ?assertEqual(
+        {stop, Acc0#acc{
+            workers=[],
+            replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)]
+        }},
+        handle_message(bar, Worker0, Acc0#acc{
+            workers=[Worker0],
+            replies=[fabric_util:kv(foo,1)]
+        })
+    ),
+
+    % Check that when R is met we stop with a new state and
+    % a q_reply.
+
+    ?assertEqual(
+        {stop, Acc0#acc{
+            workers=[],
+            replies=[fabric_util:kv(foo,2)],
+            state=r_met,
+            q_reply=foo
+        }},
+        handle_message(foo, Worker1, Acc0#acc{
+            workers=[Worker0, Worker1],
+            replies=[fabric_util:kv(foo,1)]
+        })
+    ),
+
+    ?assertEqual(
+        {stop, Acc0#acc{
+            workers=[],
+            r=1,
+            replies=[fabric_util:kv(foo,1)],
+            state=r_met,
+            q_reply=foo
+        }},
+        handle_message(foo, Worker0, Acc0#acc{r=1})
+    ),
+
+    ?assertEqual(
+        {stop, Acc0#acc{
+            workers=[],
+            replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,2)],
+            state=r_met,
+            q_reply=foo
+        }},
+        handle_message(foo, Worker0, Acc0#acc{
+            workers=[Worker0],
+            replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)]
+        })
+    ),
+
+    stop_meck_(),
+    ok.
+
+read_repair_test() ->
+    start_meck_(),
+    meck:expect(twig, log, fun(_, _, _) -> ok end),
+
+    Foo1 = {ok, #doc{revs = {1,[<<"foo">>]}}},
+    Foo2 = {ok, #doc{revs = {2,[<<"foo2">>,<<"foo">>]}}},
+    NFM = {not_found, missing},
+
+    % Test when we have actual doc data to repair
+
+    meck:expect(fabric, update_docs, fun(_, [_], _) -> {ok, []} end),
+    Acc0 = #acc{
+        dbname = <<"name">>,
+        replies = [fabric_util:kv(Foo1,1)]
+    },
+    ?assertEqual(Foo1, read_repair(Acc0)),
+
+    meck:expect(fabric, update_docs, fun(_, [_, _], _) -> {ok, []} end),
+    Acc1 = #acc{
+        dbname = <<"name">>,
+        replies = [fabric_util:kv(Foo1,1), fabric_util:kv(Foo2,1)]
+    },
+    ?assertEqual(Foo2, read_repair(Acc1)),
+
+    % Test when we have nothing but errors
+
+    Acc2 = #acc{replies=[fabric_util:kv(NFM, 1)]},
+    ?assertEqual(NFM, read_repair(Acc2)),
+
+    Acc3 = #acc{replies=[fabric_util:kv(NFM,1), fabric_util:kv(foo,2)]},
+    ?assertEqual(NFM, read_repair(Acc3)),
+
+    Acc4 = #acc{replies=[fabric_util:kv(foo,1), fabric_util:kv(bar,1)]},
+    ?assertEqual(bar, read_repair(Acc4)),
+
+    stop_meck_(),
+    ok.
+
+handle_response_quorum_met_test() ->
+    start_meck_(),
+    meck:expect(twig, log, fun(_, _, _) -> ok end),
+    meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, []} end),
 
-open_doc_test() ->
     Foo1 = {ok, #doc{revs = {1,[<<"foo">>]}}},
     Foo2 = {ok, #doc{revs = {2,[<<"foo2">>,<<"foo">>]}}},
     Bar1 = {ok, #doc{revs = {1,[<<"bar">>]}}},
-    Baz1 = {ok, #doc{revs = {1,[<<"baz">>]}}},
-    NF = {not_found, missing},
-    State0 = {[nil, nil, nil], 2, []},
-    State1 = {[nil, nil], 2, [fabric_util:kv(Foo1,1)]},
-    State2 = {[nil], 2, [fabric_util:kv(Bar1,1), fabric_util:kv(Foo1,1)]},
-    State3 = {[nil], 2, [fabric_util:kv(Foo1,1), fabric_util:kv(Foo2,1)]},
-    ?assertEqual({ok, State1}, handle_message(Foo1, nil, State0)),
 
-    % normal case - quorum reached, no disagreement
-    ?assertEqual({stop, Foo1}, handle_message(Foo1, nil, State1)),
+    BasicOkAcc = #acc{
+        state=r_met,
+        replies=[fabric_util:kv(Foo1,2)],
+        q_reply=Foo1
+    },
+    ?assertEqual(Foo1, handle_response(BasicOkAcc)),
+
+    WithAncestorsAcc = #acc{
+        state=r_met,
+        replies=[fabric_util:kv(Foo1,1), fabric_util:kv(Foo2,2)],
+        q_reply=Foo2
+    },
+    ?assertEqual(Foo2, handle_response(WithAncestorsAcc)),
 
-    % 2nd worker disagrees, voting continues
-    ?assertEqual({ok, State2}, handle_message(Bar1, nil, State1)),
+    % This also checks when the quorum isn't the most recent
+    % revision.
+    DeeperWinsAcc = #acc{
+        state=r_met,
+        replies=[fabric_util:kv(Foo1,2), fabric_util:kv(Foo2,1)],
+        q_reply=Foo1
+    },
+    ?assertEqual(Foo2, handle_response(DeeperWinsAcc)),
 
-    % 3rd worker resolves voting, but repair is needed
-    ?assertEqual({error, needs_repair}, handle_message(Foo1, nil, State2)),
+    % Check that we return the proper doc based on rev
+    % (ie, pos is equal)
+    BiggerRevWinsAcc = #acc{
+        state=r_met,
+        replies=[fabric_util:kv(Foo1,1), fabric_util:kv(Bar1,2)],
+        q_reply=Bar1
+    },
+    ?assertEqual(Foo1, handle_response(BiggerRevWinsAcc)),
 
-    % 2nd worker comes up with descendant of Foo1, voting continues
-    ?assertEqual({ok, State3}, handle_message(Foo2, nil, State1)),
+    % r_not_met is a proxy to read_repair so we rely on
+    % read_repair_test for those conditions.
 
-    % 3rd worker is also a descendant so run repair async
-    ?assertEqual({error, needs_repair, Foo2}, handle_message(Foo2, nil,
-        State3)),
+    stop_meck_(),
+    ok.
 
-    % We only run async repair when every revision is part of the same branch
-    ?assertEqual({error, needs_repair}, handle_message(Bar1, nil, State3)),
 
-    % not_found is considered to be an ancestor of everybody
-    {ok, State4} = handle_message(NF, nil, State1),
-    ?assertEqual({error, needs_repair, Foo1}, handle_message(Foo1, nil,
-        State4)),
+start_meck_() ->
+    meck:new([twig, rexi, fabric]).
 
-    % 3 distinct edit branches result in quorum failure
-    ?assertEqual({error, needs_repair}, handle_message(Baz1, nil, State2)).
+stop_meck_() ->
+    meck:unload([twig, rexi, fabric]).

Reply via email to