This is an automated email from the ASF dual-hosted git repository. jiahuili430 pushed a commit to branch fix-changes-stats in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit e9539806c184cb4f3f7bf2848c8f306b65a0fef9 Author: Jiahui Li <lijiahui...@gmail.com> AuthorDate: Sat Dec 2 20:23:37 2023 -0600 refactor `get_changes_row` --- src/couch/src/couch_changes.erl | 3 +- src/couch/test/eunit/couch_changes_tests.erl | 172 ++++++++++++++++++++------- src/fabric/src/fabric_rpc.erl | 96 +++++++-------- 3 files changed, 178 insertions(+), 93 deletions(-) diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl index e20e86b70..cd6d55eb7 100644 --- a/src/couch/src/couch_changes.erl +++ b/src/couch/src/couch_changes.erl @@ -19,7 +19,6 @@ wait_updated/3, get_rest_updated/1, configure_filter/4, - filter/3, filter/4, handle_db_event/3, handle_view_event/3, @@ -283,7 +282,7 @@ filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}, IncludeDocs) when Includ {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs), {Docs, filter_revs(Passes, Docs)}; filter(Db, DocInfo, Filter, _IncludeDocs) -> - filter(Db, DocInfo, Filter). + {[], filter(Db, DocInfo, Filter)}. get_view_qs({json_req, {Props}}) -> {Query} = couch_util:get_value(<<"query">>, Props, {[]}), diff --git a/src/couch/test/eunit/couch_changes_tests.erl b/src/couch/test/eunit/couch_changes_tests.erl index 293f49e2e..654858507 100644 --- a/src/couch/test/eunit/couch_changes_tests.erl +++ b/src/couch/test/eunit/couch_changes_tests.erl @@ -24,45 +24,6 @@ doc = nil }). -setup() -> - DbName = ?tempdb(), - {ok, Db} = create_db(DbName), - Revs = [ - R - || {ok, R} <- [ - save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}), - save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}), - save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}), - save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}), - save_doc(Db, {[{<<"_id">>, <<"doc5">>}]}) - ] - ], - Rev = lists:nth(3, Revs), - - {ok, Db1} = couch_db:reopen(Db), - {ok, Rev1} = save_doc(Db1, {[{<<"_id">>, <<"doc3">>}, {<<"_rev">>, Rev}]}), - Revs1 = Revs ++ [Rev1], - Revs2 = - Revs1 ++ - [ - R - || {ok, R} <- [ - save_doc(Db1, {[{<<"_id">>, <<"doc6">>}]}), - save_doc(Db1, {[{<<"_id">>, <<"_design/foo">>}]}), - save_doc(Db1, {[{<<"_id">>, <<"doc7">>}]}), - save_doc(Db1, {[{<<"_id">>, <<"doc8">>}]}) - ] - ], - config:set( - "native_query_servers", "erlang", "{couch_native_process, start_link, []}", _Persist = false - ), - {DbName, list_to_tuple(Revs2)}. - -teardown({DbName, _}) -> - config:delete("native_query_servers", "erlang", _Persist = false), - delete_db(DbName), - ok. - changes_feed_test_() -> { "Changes feed", { @@ -90,6 +51,7 @@ changes_filter_test_() -> fun test_util:start_couch/0, fun test_util:stop_couch/1, [ + filter_by_custom_function(), filter_by_design(), filter_by_doc_id(), filter_by_filter_function(), @@ -120,6 +82,17 @@ changes_style_test_() -> } }. +filter_by_custom_function() -> + { + "Filter function", + { + foreach, + fun setup/0, + fun teardown/1, + [?TDEF_FE(t_receive_heartbeats, ?TIMEOUT div 1000)] + } + }. + filter_by_design() -> { "Filter _design", @@ -259,6 +232,75 @@ t_end_changes_when_db_deleted({DbName, _Revs}) -> {_Rows, _LastSeq} = wait_finished(Consumer), ok = stop_consumer(Consumer). +t_receive_heartbeats(_) -> + DbName = ?tempdb(), + Timeout = 100, + {ok, Db} = create_db(DbName), + + {ok, _} = save_doc( + Db, + {[ + {<<"_id">>, <<"_design/filtered">>}, + {<<"language">>, <<"javascript">>}, + {<<"filters">>, + {[ + {<<"foo">>, << + "function(doc) {\n" + " return ['doc10', 'doc11', 'doc12'].indexOf(doc._id) != -1;}" + >>} + ]}} + ]} + ), + + ChangesArgs = #changes_args{ + filter = "filtered/foo", + feed = "continuous", + timeout = 10000, + heartbeat = 1000 + }, + Consumer = spawn_consumer(DbName, ChangesArgs, {json_req, null}), + + {ok, _Rev1} = save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}), + timer:sleep(Timeout), + {ok, _Rev2} = save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}), + timer:sleep(Timeout), + {ok, _Rev3} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}), + timer:sleep(Timeout), + {ok, _Rev4} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}), + timer:sleep(Timeout), + {ok, _Rev5} = save_doc(Db, {[{<<"_id">>, <<"doc5">>}]}), + timer:sleep(Timeout), + {ok, _Rev6} = save_doc(Db, {[{<<"_id">>, <<"doc6">>}]}), + timer:sleep(Timeout), + {ok, _Rev7} = save_doc(Db, {[{<<"_id">>, <<"doc7">>}]}), + timer:sleep(Timeout), + {ok, _Rev8} = save_doc(Db, {[{<<"_id">>, <<"doc8">>}]}), + timer:sleep(Timeout), + {ok, _Rev9} = save_doc(Db, {[{<<"_id">>, <<"doc9">>}]}), + + Heartbeats = get_heartbeats(Consumer), + ?assert(Heartbeats > 0), + + {ok, _Rev10} = save_doc(Db, {[{<<"_id">>, <<"doc10">>}]}), + timer:sleep(Timeout), + {ok, _Rev11} = save_doc(Db, {[{<<"_id">>, <<"doc11">>}]}), + timer:sleep(Timeout), + {ok, _Rev12} = save_doc(Db, {[{<<"_id">>, <<"doc12">>}]}), + + Heartbeats2 = get_heartbeats(Consumer), + ?assert(Heartbeats2 > Heartbeats), + + Rows = get_rows(Consumer), + ?assertEqual(3, length(Rows)), + + {ok, _Rev13} = save_doc(Db, {[{<<"_id">>, <<"doc13">>}]}), + timer:sleep(Timeout), + {ok, _Rev14} = save_doc(Db, {[{<<"_id">>, <<"doc14">>}]}), + timer:sleep(Timeout), + + Heartbeats3 = get_heartbeats(Consumer), + ?assert(Heartbeats3 > Heartbeats2). + t_emit_only_design_documents({DbName, Revs}) -> ChArgs = #changes_args{filter = "_design"}, Req = {json_req, null}, @@ -460,7 +502,6 @@ t_select_with_continuous({DbName, Revs}) -> ?assertEqual(ok, wait_row_notifications(1)), ok = pause(Consumer), NewRows = get_rows(Consumer), - ?debugVal(NewRows), ?assertMatch([#row{seq = _, id = <<"doc8">>, deleted = false}], NewRows), ?assertEqual([#row{seq = 12, id = <<"doc8">>, deleted = false}], NewRows). @@ -627,6 +668,44 @@ t_style_all_docs_with_include_docs({DbName, Revs}) -> ). %%%%%%%%%%%%%%%%%%%% Utility Functions %%%%%%%%%%%%%%%%%%%% +setup() -> + DbName = ?tempdb(), + {ok, Db} = create_db(DbName), + Revs = [ + R + || {ok, R} <- [ + save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}), + save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}), + save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}), + save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}), + save_doc(Db, {[{<<"_id">>, <<"doc5">>}]}) + ] + ], + Rev = lists:nth(3, Revs), + {ok, Db1} = couch_db:reopen(Db), + {ok, Rev1} = save_doc(Db1, {[{<<"_id">>, <<"doc3">>}, {<<"_rev">>, Rev}]}), + Revs1 = Revs ++ [Rev1], + Revs2 = + Revs1 ++ + [ + R + || {ok, R} <- [ + save_doc(Db1, {[{<<"_id">>, <<"doc6">>}]}), + save_doc(Db1, {[{<<"_id">>, <<"_design/foo">>}]}), + save_doc(Db1, {[{<<"_id">>, <<"doc7">>}]}), + save_doc(Db1, {[{<<"_id">>, <<"doc8">>}]}) + ] + ], + config:set( + "native_query_servers", "erlang", "{couch_native_process, start_link, []}", _Persist = false + ), + {DbName, list_to_tuple(Revs2)}. + +teardown({DbName, _}) -> + config:delete("native_query_servers", "erlang", _Persist = false), + delete_db(DbName), + ok. + update_ddoc(DbName, DDoc) -> {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), {ok, _} = couch_db:update_doc(Db, DDoc, []), @@ -646,6 +725,19 @@ save_doc(Db, Json) -> {ok, Rev} = couch_db:update_doc(Db, Doc, []), {ok, couch_doc:rev_to_str(Rev)}. +get_heartbeats({Consumer, _}) -> + Ref = make_ref(), + Consumer ! {get_heartbeats, Ref}, + Resp = + receive + {hearthbeats, Ref, HeartBeats} -> + HeartBeats + after ?TIMEOUT -> + timeout + end, + ?assertNotEqual(timeout, Resp), + Resp. + get_rows({Consumer, _}) -> Ref = make_ref(), Consumer ! {get_rows, Ref}, diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 42544ac40..413758818 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -539,76 +539,70 @@ changes_enumerator(DocInfo, Acc) -> db = Db, args = #changes_args{ include_docs = IncludeDocs, - filter_fun = Filter + conflicts = Conflicts, + filter_fun = Filter, + doc_options = DocOptions }, - pending = Pending + pending = Pending, + epochs = Epochs } = Acc, - #doc_info{high_seq = Seq} = DocInfo, - RevsOrDocRevs = couch_changes:filter(Db, DocInfo, Filter, IncludeDocs), - % include_docs = false, call `filter/3`, use `couch_changes:open_revs/3` to read docs. - % include_docs = true, call `filter/4`, it will return [revs] or {docs, revs}. - % - {}: use `couch_changes:open_revs/3` to open docs - % - []: use `fabric_rpc:doc_member/3` to open docs + Opts = + case Conflicts of + true -> [conflicts | DocOptions]; + false -> DocOptions + end, + Seq = DocInfo#doc_info.high_seq, + {Docs, Revs} = couch_changes:filter(Db, DocInfo, Filter, IncludeDocs), + Changes = [X || X <- Revs, X /= null], ChangesRow = - case is_tuple(RevsOrDocRevs) of - true -> - {Docs, Revs} = RevsOrDocRevs, - get_changes_row(Revs, Acc, DocInfo, fun get_json_docs/3, Docs); - false -> - get_changes_row(RevsOrDocRevs, Acc, DocInfo, fun doc_member/3, {Db, DocInfo}) + case {Changes, Docs, IncludeDocs} of + {[], _, _} -> + {no_pass, [ + {pending, Pending - 1}, + {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}} + ]}; + {_, _, false} -> + get_changes_row(Changes, [], DocInfo, Acc); + {_, [], true} -> + % Open docs in `fabric_rpc:doc_member/4` + Docs1 = [doc_member(Db, DocInfo, Opts, Filter)], + get_changes_row(Changes, Docs1, DocInfo, Acc); + {_, [Doc], true} -> + % Open docs in `couch_changes:open_revs/3`, and + % stored in [Doc], so call `get_json_doc/3` directly + Docs1 = [get_json_doc(Doc, Opts, Filter)], + get_changes_row(Changes, Docs1, DocInfo, Acc) end, ok = rexi:stream2(ChangesRow), {ok, Acc#fabric_changes_acc{seq = Seq, pending = Pending - 1}}. -get_changes_row(Revs, Acc, DocInfo, Fun, Args) -> +get_changes_row(Changes, Docs, DocInfo, Acc) -> #fabric_changes_acc{ db = Db, - args = #changes_args{ - include_docs = IncludeDocs, - conflicts = Conflicts, - filter_fun = Filter, - doc_options = DocOptions - }, pending = Pending, epochs = Epochs } = Acc, #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DocInfo, - case [X || X <- Revs, X /= null] of - [] -> - {no_pass, [ - {pending, Pending - 1}, - {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}} - ]}; - Results -> - Opts = - if - Conflicts -> [conflicts | DocOptions]; - true -> DocOptions - end, - {change, [ - {pending, Pending - 1}, - {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}}, - {id, Id}, - {changes, Results}, - {deleted, Del} - | if - IncludeDocs -> [Fun(Args, Opts, Filter)]; - true -> [] - end - ]} - end. - -get_json_docs([Doc], Opts, Filter) -> - {doc, maybe_filtered_json_doc(Doc, Opts, Filter)}. - -doc_member({Shard, DocInfo}, Opts, Filter) -> + {change, [ + {pending, Pending - 1}, + {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}}, + {id, Id}, + {changes, Changes}, + {deleted, Del} + | Docs + ]}. + +doc_member(Shard, DocInfo, Opts, Filter) -> case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of {ok, Doc} -> - {doc, maybe_filtered_json_doc(Doc, Opts, Filter)}; + get_json_doc(Doc, Opts, Filter); Error -> Error end. +get_json_doc(Doc, Opts, Filter) -> + {doc, maybe_filtered_json_doc(Doc, Opts, Filter)}. + maybe_filtered_json_doc(Doc, Opts, {selector, _Style, {_Selector, Fields}}) when Fields =/= nil ->