This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 7e29e6348442226579f314544e309547039a5226
Author: Paul J. Davis <[email protected]>
AuthorDate: Mon May 14 16:46:42 2018 -0500

    WIP - read repair tests
---
 src/fabric/src/fabric_rpc.erl              |  16 +-
 src/fabric/test/fabric_rpc_purge_tests.erl | 290 +++++++++++++++++++++++++++++
 2 files changed, 296 insertions(+), 10 deletions(-)

diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 783764e..da9a936 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -354,10 +354,10 @@ filter_purged_revs(Db, NodeIdRevs, Docs) ->
     Opts = [{start_key, StartKey}],
     FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) ->
         case Id of
-            <<?LOCAL_DOC_PREFIX, "/purge-mem3-", _/binary>> ->
+            <<?LOCAL_DOC_PREFIX, "purge-mem3-", _/binary>> ->
                 TargetNodeBin = couch_util:get_value(<<"target_node">>, Props),
                 PurgeSeq = couch_util:get_value(<<"purge_seq">>, Props),
-                NewAcc = try
+                try
                     TargetNode = binary_to_existing_atom(TargetNodeBin, 
latin1),
                     case lists:member(TargetNode, Nodes) of
                         true ->
@@ -369,8 +369,7 @@ filter_purged_revs(Db, NodeIdRevs, Docs) ->
                     % A really old doc referring to a node that's
                     % no longer in the cluster
                     {ok, Acc}
-                end,
-                {ok, NewAcc};
+                end;
             _ ->
                 % We've processed all _local mem3 purge docs
                 {stop, Acc}
@@ -407,7 +406,7 @@ filter_purged_revs(Db, NodeIdRevs, Docs) ->
     % have been purged before running our updates
     RestGood = if NeedChecking == [] -> []; true ->
         CheckFoldFun = fun({PSeq, _UUID, DocId, Revs}, Acc) ->
-            FilterFun = fun({NS, FiltDocId, FiltRev}) ->
+            FilterFun = fun({NS, {FiltDocId, FiltRev}}) ->
                 % The `NS =< PSeq` portion of this translates to the
                 % fact that we haven't yet replicated PSeq to the
                 % target node, hence we would need to filter this read
@@ -418,11 +417,8 @@ filter_purged_revs(Db, NodeIdRevs, Docs) ->
             {ok, lists:filter(FilterFun, Acc)}
         end,
         StartSeq = lists:min([S || {S, _} <- NeedChecking]),
-        InitAcc = lists:flatmap(fun({NodeSeq, {DocId, Revs}}) ->
-            [{NodeSeq, DocId, Rev} || Rev <- Revs]
-        end, NeedChecking),
-        {ok, Result} =
-                couch_db:fold_purge_infos(Db, StartSeq, CheckFoldFun, InitAcc),
+        {ok, Result} = couch_db:fold_purge_infos(
+                Db, StartSeq, CheckFoldFun, NeedChecking),
         [{DocId, Rev} || {_NSeq, DocId, Rev} <- Result]
     end,
 
diff --git a/src/fabric/test/fabric_rpc_purge_tests.erl 
b/src/fabric/test/fabric_rpc_purge_tests.erl
new file mode 100644
index 0000000..0a909cd
--- /dev/null
+++ b/src/fabric/test/fabric_rpc_purge_tests.erl
@@ -0,0 +1,290 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_rpc_purge_tests).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(TDEF(A), {A, fun A/1}).
+
+% TODO: Add tests:
+%         - filter some updates
+%         - ignore lagging nodes
+
+main_test_() ->
+    {
+        setup,
+        spawn,
+        fun setup_all/0,
+        fun teardown_all/1,
+        [
+            {
+                foreach,
+                fun setup_no_purge/0,
+                fun teardown_no_purge/1,
+                lists:map(fun wrap/1, [
+                    ?TDEF(t_no_purge_no_filter)
+                ])
+            },
+            {
+                foreach,
+                fun setup_single_purge/0,
+                fun teardown_single_purge/1,
+                lists:map(fun wrap/1, [
+                    ?TDEF(t_filter),
+                    ?TDEF(t_no_filter_interactive_edit),
+                    ?TDEF(t_no_filter_unknown_node),
+                    ?TDEF(t_no_filter_old_node),
+                    ?TDEF(t_no_filter_different_node),
+                    ?TDEF(t_no_filter_different_docid),
+                    ?TDEF(t_no_filter_different_rev),
+                    ?TDEF(t_no_filter_after_repl)
+                ])
+            }
+        ]
+    }.
+
+
+setup_all() ->
+    test_util:start_couch().
+
+
+teardown_all(Ctx) ->
+    test_util:stop_couch(Ctx).
+
+
+setup_no_purge() ->
+    {ok, Db} = cet_util:create_db(),
+    populate_db(Db),
+    couch_db:name(Db).
+
+
+teardown_no_purge(DbName) ->
+    ok = couch_server:delete(DbName, []).
+
+
+setup_single_purge() ->
+    DbName = setup_no_purge(),
+    DocId = <<"0003">>,
+    {ok, OldDoc} = open_doc(DbName, DocId),
+    purge_doc(DbName, DocId),
+    {DbName, DocId, OldDoc}.
+
+
+teardown_single_purge({DbName, _, _}) ->
+    teardown_no_purge(DbName).
+
+
+t_no_purge_no_filter(DbName) ->
+    DocId = <<"0003">>,
+
+    {ok, OldDoc} = open_doc(DbName, DocId),
+    NewDoc = create_update(OldDoc, 2),
+
+    rpc_update_doc(DbName, NewDoc),
+
+    {ok, CurrDoc} = open_doc(DbName, DocId),
+    ?assert(CurrDoc /= OldDoc),
+    ?assert(CurrDoc == NewDoc).
+
+
+t_filter({DbName, DocId, OldDoc}) ->
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+    create_purge_checkpoint(DbName, 0),
+
+    rpc_update_doc(DbName, OldDoc),
+
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)).
+
+
+t_no_filter_interactive_edit(_) ->
+    ok.
+
+
+t_no_filter_unknown_node({DbName, DocId, OldDoc}) ->
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+    create_purge_checkpoint(DbName, 0),
+
+    {Pos, [Rev | _]} = OldDoc#doc.revs,
+    RROpt = {read_repair, [{'[email protected]', {DocId, [{Pos, Rev}]}}]},
+    rpc_update_doc(DbName, OldDoc, [RROpt]),
+
+    ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)).
+
+
+t_no_filter_old_node({DbName, DocId, OldDoc}) ->
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+    create_purge_checkpoint(DbName, 1),
+
+    % The random UUID is to generate a badarg exception when
+    % we try and convert it to an existing atom.
+    create_purge_checkpoint(DbName, 0, couch_uuids:random()),
+
+    rpc_update_doc(DbName, OldDoc),
+
+    ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)).
+
+
+t_no_filter_different_node({DbName, DocId, OldDoc}) ->
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+    create_purge_checkpoint(DbName, 1),
+
+    % Create a valid purge for a different node
+    TgtNode = list_to_binary(atom_to_list('[email protected]')),
+    create_purge_checkpoint(DbName, 0, TgtNode),
+
+    rpc_update_doc(DbName, OldDoc),
+
+    ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)).
+
+
+% This really shows that I should rewrite how read_repair
+% works as we'll never actually call this with different
+% doc ids or even multiple docids
+t_no_filter_different_docid({DbName, DocId, OldDoc}) ->
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+    create_purge_checkpoint(DbName, 0),
+
+    {Pos, [Rev | _]} = OldDoc#doc.revs,
+    RROpt = {read_repair, [{tgt_node(), {<<"bang">>, [{Pos, Rev}]}}]},
+    rpc_update_doc(DbName, OldDoc, [RROpt]),
+
+    ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)).
+
+
+% Similar to the different docid rev we're not going to be
+% passing a different doc update with a different revision
+% to the read_repair function
+t_no_filter_different_rev({DbName, DocId, OldDoc}) ->
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+    create_purge_checkpoint(DbName, 0),
+
+    Rev = crypto:hash(md5, couch_uuids:random()),
+    RROpt = {read_repair, [{tgt_node(), {DocId, [{3, Rev}]}}]},
+    rpc_update_doc(DbName, OldDoc, [RROpt]),
+
+    ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)).
+
+
+t_no_filter_after_repl({DbName, DocId, OldDoc}) ->
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+    create_purge_checkpoint(DbName, 1),
+
+    rpc_update_doc(DbName, OldDoc),
+
+    ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)).
+
+
+wrap({Name, Fun}) ->
+    fun(Arg) ->
+        {timeout, 60, {atom_to_list(Name), fun() ->
+            process_flag(trap_exit, true),
+            Fun(Arg)
+        end}}
+    end.
+
+
+populate_db(Db) ->
+    Docs = lists:map(fun(Idx) ->
+        DocId = lists:flatten(io_lib:format("~4..0b", [Idx])),
+        #doc{
+            id = list_to_binary(DocId),
+            body = {[{<<"int">>, Idx}, {<<"vsn">>, 2}]}
+        }
+    end, lists:seq(1, 100)),
+    {ok, _} = couch_db:update_docs(Db, Docs).
+
+
+open_doc(DbName, DocId) ->
+    couch_util:with_db(DbName, fun(Db) ->
+        couch_db:open_doc(Db, DocId, [])
+    end).
+
+
+create_update(Doc, NewVsn) ->
+    #doc{
+        id = DocId,
+        revs = {Pos, [Rev | _] = Revs},
+        body = {Props}
+    } = Doc,
+    NewProps = lists:keyreplace(<<"vsn">>, 1, Props, {<<"vsn">>, NewVsn}),
+    NewRev = crypto:hash(md5, term_to_binary({DocId, Rev, {NewProps}})),
+    Doc#doc{
+        revs = {Pos + 1, [NewRev | Revs]},
+        body = {NewProps}
+    }.
+
+
+purge_doc(DbName, DocId) ->
+    {ok, Doc} = open_doc(DbName, DocId),
+    {Pos, [Rev | _]} = Doc#doc.revs,
+    PInfo = {couch_uuids:random(), DocId, [{Pos, Rev}]},
+    Resp = couch_util:with_db(DbName, fun(Db) ->
+        couch_db:purge_docs(Db, [PInfo], [])
+    end),
+    ?assertEqual({ok, [{ok, [{Pos, Rev}]}]}, Resp).
+
+
+create_purge_checkpoint(DbName, PurgeSeq) ->
+    create_purge_checkpoint(DbName, PurgeSeq, tgt_node_bin()).
+
+
+create_purge_checkpoint(DbName, PurgeSeq, TgtNode) when is_binary(TgtNode) ->
+    Resp = couch_util:with_db(DbName, fun(Db) ->
+        SrcUUID = couch_db:get_uuid(Db),
+        TgtUUID = couch_uuids:random(),
+        CPDoc = #doc{
+            id = mem3_rep:make_purge_id(SrcUUID, TgtUUID),
+            body = {[
+                {<<"target_node">>, TgtNode},
+                {<<"purge_seq">>, PurgeSeq}
+            ]}
+        },
+        couch_db:update_docs(Db, [CPDoc], [])
+    end),
+    ?assertMatch({ok, [_]}, Resp).
+
+
+rpc_update_doc(DbName, Doc) ->
+    {Pos, [Rev | _]} = Doc#doc.revs,
+    RROpt = {read_repair, [{tgt_node(), {Doc#doc.id, [{Pos, Rev}]}}]},
+    rpc_update_doc(DbName, Doc, [RROpt]).
+
+
+rpc_update_doc(DbName, Doc, Opts0) ->
+    Opts = case lists:member(interactive_edit, Opts0) of
+        true -> Opts0;
+        false -> [replicated_changes | Opts0]
+    end,
+    Ref = erlang:make_ref(),
+    put(rexi_from, {self(), Ref}),
+    fabric_rpc:update_docs(DbName, [Doc], Opts),
+    Reply = test_util:wait(fun() ->
+        receive
+            {Ref, Reply} ->
+                Reply
+        after 0 ->
+            wait
+        end
+    end),
+    ?assertEqual({ok, []}, Reply).
+
+
+tgt_node() ->
+    '[email protected]'.
+
+
+tgt_node_bin() ->
+    iolist_to_binary(atom_to_list(tgt_node())).
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to