This is an automated email from the ASF dual-hosted git repository. jiahuili430 pushed a commit to branch fix-changes-stats in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 20577c2585374a1675e805d598e2b9733904c476 Author: Jiahui Li <[email protected]> AuthorDate: Sat Dec 2 20:23:37 2023 -0600 refactor `get_changes_row` --- src/couch/src/couch_changes.erl | 3 +- src/couch/test/eunit/couch_changes_tests.erl | 172 ++++++++++++++++++++------- src/fabric/src/fabric_rpc.erl | 99 +++++++-------- 3 files changed, 176 insertions(+), 98 deletions(-) diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl index e20e86b70..cd6d55eb7 100644 --- a/src/couch/src/couch_changes.erl +++ b/src/couch/src/couch_changes.erl @@ -19,7 +19,6 @@ wait_updated/3, get_rest_updated/1, configure_filter/4, - filter/3, filter/4, handle_db_event/3, handle_view_event/3, @@ -283,7 +282,7 @@ filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}, IncludeDocs) when Includ {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs), {Docs, filter_revs(Passes, Docs)}; filter(Db, DocInfo, Filter, _IncludeDocs) -> - filter(Db, DocInfo, Filter). + {[], filter(Db, DocInfo, Filter)}. get_view_qs({json_req, {Props}}) -> {Query} = couch_util:get_value(<<"query">>, Props, {[]}), diff --git a/src/couch/test/eunit/couch_changes_tests.erl b/src/couch/test/eunit/couch_changes_tests.erl index 293f49e2e..a36ca0df1 100644 --- a/src/couch/test/eunit/couch_changes_tests.erl +++ b/src/couch/test/eunit/couch_changes_tests.erl @@ -16,6 +16,7 @@ -include_lib("couch/include/couch_db.hrl"). -define(TIMEOUT, 6000). +%%-define(TEST_TIMEOUT, 10000). -record(row, { id, @@ -24,45 +25,6 @@ doc = nil }). -setup() -> - DbName = ?tempdb(), - {ok, Db} = create_db(DbName), - Revs = [ - R - || {ok, R} <- [ - save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}), - save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}), - save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}), - save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}), - save_doc(Db, {[{<<"_id">>, <<"doc5">>}]}) - ] - ], - Rev = lists:nth(3, Revs), - - {ok, Db1} = couch_db:reopen(Db), - {ok, Rev1} = save_doc(Db1, {[{<<"_id">>, <<"doc3">>}, {<<"_rev">>, Rev}]}), - Revs1 = Revs ++ [Rev1], - Revs2 = - Revs1 ++ - [ - R - || {ok, R} <- [ - save_doc(Db1, {[{<<"_id">>, <<"doc6">>}]}), - save_doc(Db1, {[{<<"_id">>, <<"_design/foo">>}]}), - save_doc(Db1, {[{<<"_id">>, <<"doc7">>}]}), - save_doc(Db1, {[{<<"_id">>, <<"doc8">>}]}) - ] - ], - config:set( - "native_query_servers", "erlang", "{couch_native_process, start_link, []}", _Persist = false - ), - {DbName, list_to_tuple(Revs2)}. - -teardown({DbName, _}) -> - config:delete("native_query_servers", "erlang", _Persist = false), - delete_db(DbName), - ok. - changes_feed_test_() -> { "Changes feed", { @@ -90,6 +52,7 @@ changes_filter_test_() -> fun test_util:start_couch/0, fun test_util:stop_couch/1, [ + %%filter_by_custom_function(), filter_by_design(), filter_by_doc_id(), filter_by_filter_function(), @@ -120,6 +83,18 @@ changes_style_test_() -> } }. +%% filter_by_custom_function() -> +%% { +%% "Filter function", +%% { +%% foreach, +%% fun setup/0, fun teardown/1, +%% [ +%% fun should_receive_heartbeats/1 +%% ] +%% } +%% }. + filter_by_design() -> { "Filter _design", @@ -259,6 +234,73 @@ t_end_changes_when_db_deleted({DbName, _Revs}) -> {_Rows, _LastSeq} = wait_finished(Consumer), ok = stop_consumer(Consumer). +%% should_receive_heartbeats(_) -> +%% {timeout, ?TEST_TIMEOUT div 1000, +%% ?_test( +%% begin +%% DbName = ?tempdb(), +%% Timeout = 100, +%% {ok, Db} = create_db(DbName), + +%% {ok, _} = save_doc(Db, {[ +%% {<<"_id">>, <<"_design/filtered">>}, +%% {<<"language">>, <<"javascript">>}, +%% {<<"filters">>, {[ +%% {<<"foo">>, <<"function(doc) { +%% return ['doc10', 'doc11', 'doc12'].indexOf(doc._id) != -1;}">> +%% }]}} +%% ]}), + +%% ChangesArgs = #changes_args{ +%% filter = "filtered/foo", +%% feed = "continuous", +%% timeout = 10000, +%% heartbeat = 1000 +%% }, +%% Consumer = spawn_consumer(DbName, ChangesArgs, {json_req, null}), + +%% {ok, _Rev1} = save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev2} = save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev3} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev4} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev5} = save_doc(Db, {[{<<"_id">>, <<"doc5">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev6} = save_doc(Db, {[{<<"_id">>, <<"doc6">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev7} = save_doc(Db, {[{<<"_id">>, <<"doc7">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev8} = save_doc(Db, {[{<<"_id">>, <<"doc8">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev9} = save_doc(Db, {[{<<"_id">>, <<"doc9">>}]}), + +%% Heartbeats = get_heartbeats(Consumer), +%% ?assert(Heartbeats > 0), + +%% {ok, _Rev10} = save_doc(Db, {[{<<"_id">>, <<"doc10">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev11} = save_doc(Db, {[{<<"_id">>, <<"doc11">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev12} = save_doc(Db, {[{<<"_id">>, <<"doc12">>}]}), + +%% Heartbeats2 = get_heartbeats(Consumer), +%% ?assert(Heartbeats2 > Heartbeats), + +%% Rows = get_rows(Consumer), +%% ?assertEqual(3, length(Rows)), + +%% {ok, _Rev13} = save_doc(Db, {[{<<"_id">>, <<"doc13">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev14} = save_doc(Db, {[{<<"_id">>, <<"doc14">>}]}), +%% timer:sleep(Timeout), + +%% Heartbeats3 = get_heartbeats(Consumer), +%% ?assert(Heartbeats3 > Heartbeats2) +%% end)}. + t_emit_only_design_documents({DbName, Revs}) -> ChArgs = #changes_args{filter = "_design"}, Req = {json_req, null}, @@ -460,7 +502,6 @@ t_select_with_continuous({DbName, Revs}) -> ?assertEqual(ok, wait_row_notifications(1)), ok = pause(Consumer), NewRows = get_rows(Consumer), - ?debugVal(NewRows), ?assertMatch([#row{seq = _, id = <<"doc8">>, deleted = false}], NewRows), ?assertEqual([#row{seq = 12, id = <<"doc8">>, deleted = false}], NewRows). @@ -627,6 +668,44 @@ t_style_all_docs_with_include_docs({DbName, Revs}) -> ). %%%%%%%%%%%%%%%%%%%% Utility Functions %%%%%%%%%%%%%%%%%%%% +setup() -> + DbName = ?tempdb(), + {ok, Db} = create_db(DbName), + Revs = [ + R + || {ok, R} <- [ + save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}), + save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}), + save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}), + save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}), + save_doc(Db, {[{<<"_id">>, <<"doc5">>}]}) + ] + ], + Rev = lists:nth(3, Revs), + {ok, Db1} = couch_db:reopen(Db), + {ok, Rev1} = save_doc(Db1, {[{<<"_id">>, <<"doc3">>}, {<<"_rev">>, Rev}]}), + Revs1 = Revs ++ [Rev1], + Revs2 = + Revs1 ++ + [ + R + || {ok, R} <- [ + save_doc(Db1, {[{<<"_id">>, <<"doc6">>}]}), + save_doc(Db1, {[{<<"_id">>, <<"_design/foo">>}]}), + save_doc(Db1, {[{<<"_id">>, <<"doc7">>}]}), + save_doc(Db1, {[{<<"_id">>, <<"doc8">>}]}) + ] + ], + config:set( + "native_query_servers", "erlang", "{couch_native_process, start_link, []}", _Persist = false + ), + {DbName, list_to_tuple(Revs2)}. + +teardown({DbName, _}) -> + config:delete("native_query_servers", "erlang", _Persist = false), + delete_db(DbName), + ok. + update_ddoc(DbName, DDoc) -> {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), {ok, _} = couch_db:update_doc(Db, DDoc, []), @@ -646,6 +725,19 @@ save_doc(Db, Json) -> {ok, Rev} = couch_db:update_doc(Db, Doc, []), {ok, couch_doc:rev_to_str(Rev)}. +%% get_heartbeats({Consumer, _}) -> +%% Ref = make_ref(), +%% Consumer ! {get_heartbeats, Ref}, +%% Resp = +%% receive +%% {hearthbeats, Ref, HeartBeats} -> +%% HeartBeats +%% after ?TIMEOUT -> +%% timeout +%% end, +%% ?assertNotEqual(timeout, Resp), +%% Resp. + get_rows({Consumer, _}) -> Ref = make_ref(), Consumer ! {get_rows, Ref}, diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 42544ac40..52eeee893 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -535,32 +535,6 @@ changes_enumerator(#full_doc_info{} = FDI, Acc) -> changes_enumerator(#doc_info{id = <<"_local/", _/binary>>, high_seq = Seq}, Acc) -> {ok, Acc#fabric_changes_acc{seq = Seq, pending = Acc#fabric_changes_acc.pending - 1}}; changes_enumerator(DocInfo, Acc) -> - #fabric_changes_acc{ - db = Db, - args = #changes_args{ - include_docs = IncludeDocs, - filter_fun = Filter - }, - pending = Pending - } = Acc, - #doc_info{high_seq = Seq} = DocInfo, - RevsOrDocRevs = couch_changes:filter(Db, DocInfo, Filter, IncludeDocs), - % include_docs = false, call `filter/3`, use `couch_changes:open_revs/3` to read docs. - % include_docs = true, call `filter/4`, it will return [revs] or {docs, revs}. - % - {}: use `couch_changes:open_revs/3` to open docs - % - []: use `fabric_rpc:doc_member/3` to open docs - ChangesRow = - case is_tuple(RevsOrDocRevs) of - true -> - {Docs, Revs} = RevsOrDocRevs, - get_changes_row(Revs, Acc, DocInfo, fun get_json_docs/3, Docs); - false -> - get_changes_row(RevsOrDocRevs, Acc, DocInfo, fun doc_member/3, {Db, DocInfo}) - end, - ok = rexi:stream2(ChangesRow), - {ok, Acc#fabric_changes_acc{seq = Seq, pending = Pending - 1}}. - -get_changes_row(Revs, Acc, DocInfo, Fun, Args) -> #fabric_changes_acc{ db = Db, args = #changes_args{ @@ -572,43 +546,56 @@ get_changes_row(Revs, Acc, DocInfo, Fun, Args) -> pending = Pending, epochs = Epochs } = Acc, - #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DocInfo, - case [X || X <- Revs, X /= null] of - [] -> - {no_pass, [ - {pending, Pending - 1}, - {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}} - ]}; - Results -> - Opts = - if - Conflicts -> [conflicts | DocOptions]; - true -> DocOptions - end, - {change, [ - {pending, Pending - 1}, - {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}}, - {id, Id}, - {changes, Results}, - {deleted, Del} - | if - IncludeDocs -> [Fun(Args, Opts, Filter)]; - true -> [] - end - ]} - end. - -get_json_docs([Doc], Opts, Filter) -> - {doc, maybe_filtered_json_doc(Doc, Opts, Filter)}. + Opts = + case Conflicts of + true -> [conflicts | DocOptions]; + false -> DocOptions + end, + Seq = DocInfo#doc_info.high_seq, + {Docs, Revs} = couch_changes:filter(Db, DocInfo, Filter, IncludeDocs), + Changes = [X || X <- Revs, X /= null], + ChangesRow = + case {Changes, Docs, IncludeDocs} of + {[], _, _} -> + {no_pass, [ + {pending, Pending - 1}, + {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}} + ]}; + {_, _, false} -> + get_changes_row(Changes, [], DocInfo, Acc); + {_, [], true} -> + Docs1 = [doc_member(Db, DocInfo, Opts, Filter)], + get_changes_row(Changes, Docs1, DocInfo, Acc); + {_, [Doc], true} -> + Docs1 = [get_json_doc(Doc, Opts, Filter)], + get_changes_row(Changes, Docs1, DocInfo, Acc) + end, + ok = rexi:stream2(ChangesRow), + {ok, Acc#fabric_changes_acc{seq = Seq, pending = Pending - 1}}. -doc_member({Shard, DocInfo}, Opts, Filter) -> +get_changes_row(Changes, Docs, DocInfo, Acc) -> + #fabric_changes_acc{db = Db, pending = Pending, epochs = Epochs} = Acc, + #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DocInfo, + {change, [ + {pending, Pending - 1}, + {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}}, + {id, Id}, + {changes, Changes}, + {deleted, Del} + | Docs + ]}. + +doc_member(Shard, DocInfo, Opts, Filter) -> case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of {ok, Doc} -> - {doc, maybe_filtered_json_doc(Doc, Opts, Filter)}; + get_json_doc(Doc, Opts, Filter); Error -> Error end. +get_json_doc(Doc, Opts, Filter) -> + {doc, maybe_filtered_json_doc(Doc, Opts, Filter)}. + maybe_filtered_json_doc(Doc, Opts, {selector, _Style, {_Selector, Fields}}) when Fields =/= nil ->
