This is an automated email from the ASF dual-hosted git repository. jiahuili430 pushed a commit to branch fix-changes-stats in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit d31fed22d9c33c764cfb71a424c74090f1d0db8a Author: Jiahui Li <[email protected]> AuthorDate: Mon Nov 20 10:29:26 2023 -0600 Avoid read docs twice when filtered `_changes` is triggered When filered `_changes` is triggered, `open_rev()` will increment stats. If request parameter has `include_docs=true`, then `doc_member()` will also increase stats. Add `filter/4` to avoid this behavior. --- src/chttpd/test/eunit/chttpd_changes_test.erl | 39 +- src/couch/src/couch_changes.erl | 21 + src/couch/test/eunit/couch_changes_tests.erl | 1229 ++++++++++--------------- src/fabric/src/fabric_rpc.erl | 69 +- 4 files changed, 603 insertions(+), 755 deletions(-) diff --git a/src/chttpd/test/eunit/chttpd_changes_test.erl b/src/chttpd/test/eunit/chttpd_changes_test.erl index d0590a83a..58b871a28 100644 --- a/src/chttpd/test/eunit/chttpd_changes_test.erl +++ b/src/chttpd/test/eunit/chttpd_changes_test.erl @@ -104,11 +104,11 @@ changes_q8_test_() -> ]) }. -% These tests are separate as they create aditional design docs during +% These tests are separate as they create additional design docs during % the test. That ends up bumping the update sequence in the db, so % last_seq and other sequences returned become dependent on the test % order. To avoid that dependence, run them in a separate suite with -% a foreach construct insted of a setup one. This way setup/teardown +% a foreach construct instead of a setup one. This way setup/teardown % happens for each individual test case. % changes_js_filters_test_() -> @@ -125,6 +125,17 @@ changes_js_filters_test_() -> ] }. +changes_open_doc_test_() -> + { + foreach, + fun setup_basic/0, + fun teardown_basic/1, + [ + ?TDEF_FE(t_selector_should_only_open_doc_once), + ?TDEF_FE(t_style_all_docs_open_doc_should_different) + ] + }. + t_basic({_, DbUrl}) -> Res = {Seq, Pending, Rows} = changes(DbUrl), ?assertEqual(7, Seq), @@ -492,6 +503,29 @@ t_view_filter_no_match({_, DbUrl}) -> ?assertEqual({8, 0, []}, changes(DbUrl, Params)), {200, #{}} = req(delete, DDocUrl ++ "?rev=" ++ binary_to_list(Rev)). +t_selector_should_only_open_doc_once({_, DbUrl}) -> + Body = #{<<"selector">> => #{<<"_id">> => ?DOC1}}, + Params = "?filter=_selector", + ParamsIncludeDocs = Params ++ "&include_docs=true", + meck:reset(couch_db), + changes_post(DbUrl, Body, Params), + Called = meck:num_calls(couch_db, open_doc, 3), + meck:reset(couch_db), + changes_post(DbUrl, Body, ParamsIncludeDocs), + CalledIncludeDocs = meck:num_calls(couch_db, open_doc, 3), + ?assertEqual(Called, CalledIncludeDocs). + +t_style_all_docs_open_doc_should_different({_, DbUrl}) -> + Params = "?style=all_docs", + ParamsIncludeDocs = Params ++ "&include_docs=true", + meck:reset(couch_db), + changes(DbUrl, Params), + Called = meck:num_calls(couch_db, open_doc, 3), + meck:reset(couch_db), + changes(DbUrl, ParamsIncludeDocs), + CalledIncludeDocs = meck:num_calls(couch_db, open_doc, 3), + ?assertNotEqual(Called, CalledIncludeDocs). + % Utility functions setup_ctx(DbCreateParams) -> @@ -522,6 +556,7 @@ setup_basic() -> CfgKey = "changes_doc_ids_optimization_threshold", ok = config:set("couchdb", CfgKey, "2", _Persist = false), meck:new(couch_changes, [passthrough]), + meck:new(couch_db, [passthrough]), {Ctx, DbUrl}. teardown_basic({Ctx, DbUrl}) -> diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl index e072a2e1c..e20e86b70 100644 --- a/src/couch/src/couch_changes.erl +++ b/src/couch/src/couch_changes.erl @@ -20,6 +20,7 @@ get_rest_updated/1, configure_filter/4, filter/3, + filter/4, handle_db_event/3, handle_view_event/3, send_changes_doc_ids/6, @@ -264,6 +265,26 @@ filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) -> {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs), filter_revs(Passes, Docs). +filter(Db, DocInfo, {selector, Style, {Selector, _Fields}}, IncludeDocs) when IncludeDocs -> + Docs = open_revs(Db, DocInfo, Style), + Passes = [mango_selector:match(Selector, couch_doc:to_json_obj(Doc, [])) || Doc <- Docs], + {Docs, filter_revs(Passes, Docs)}; +filter(Db, DocInfo, {view, Style, DDoc, VName}, IncludeDocs) when IncludeDocs -> + Docs = open_revs(Db, DocInfo, Style), + {ok, Passes} = couch_query_servers:filter_view(Db, DDoc, VName, Docs), + {Docs, filter_revs(Passes, Docs)}; +filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}, IncludeDocs) when IncludeDocs -> + Req = + case Req0 of + {json_req, _} -> Req0; + #httpd{} -> {json_req, chttpd_external:json_req_obj(Req0, Db)} + end, + Docs = open_revs(Db, DocInfo, Style), + {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs), + {Docs, filter_revs(Passes, Docs)}; +filter(Db, DocInfo, Filter, _IncludeDocs) -> + filter(Db, DocInfo, Filter). + get_view_qs({json_req, {Props}}) -> {Query} = couch_util:get_value(<<"query">>, Props, {[]}), binary_to_list(couch_util:get_value(<<"view">>, Query, "")); diff --git a/src/couch/test/eunit/couch_changes_tests.erl b/src/couch/test/eunit/couch_changes_tests.erl index 02b69f132..293f49e2e 100644 --- a/src/couch/test/eunit/couch_changes_tests.erl +++ b/src/couch/test/eunit/couch_changes_tests.erl @@ -16,7 +16,6 @@ -include_lib("couch/include/couch_db.hrl"). -define(TIMEOUT, 6000). --define(TEST_TIMEOUT, 10000). -record(row, { id, @@ -39,8 +38,8 @@ setup() -> ] ], Rev = lists:nth(3, Revs), - {ok, Db1} = couch_db:reopen(Db), + {ok, Db1} = couch_db:reopen(Db), {ok, Rev1} = save_doc(Db1, {[{<<"_id">>, <<"doc3">>}, {<<"_rev">>, Rev}]}), Revs1 = Revs ++ [Rev1], Revs2 = @@ -64,87 +63,91 @@ teardown({DbName, _}) -> delete_db(DbName), ok. -changes_test_() -> +changes_feed_test_() -> + { + "Changes feed", { + setup, + fun test_util:start_couch/0, + fun test_util:stop_couch/1, + {"Continuous Feed", + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(t_filter_continuous_feed_by_specific_doc_ids), + ?TDEF_FE(t_end_changes_when_db_deleted) + ] + }} + } + }. + +changes_filter_test_() -> { - "Changes feed", + "Changes filter", { setup, fun test_util:start_couch/0, fun test_util:stop_couch/1, [ - filter_by_selector(), - filter_by_doc_id(), filter_by_design(), - continuous_feed(), - %%filter_by_custom_function() + filter_by_doc_id(), filter_by_filter_function(), + filter_by_selector(), filter_by_view() ] } }. -filter_by_doc_id() -> +changes_style_test_() -> { - "Filter _doc_id", + "Changes style", { - foreach, - fun setup/0, - fun teardown/1, - [ - fun should_filter_by_specific_doc_ids/1, - fun should_filter_by_specific_doc_ids_descending/1, - fun should_filter_by_specific_doc_ids_with_since/1, - fun should_filter_by_specific_doc_ids_no_result/1, - fun should_handle_deleted_docs/1 - ] + setup, + fun test_util:start_couch/0, + fun test_util:stop_couch/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(t_style_main_only), + ?TDEF_FE(t_style_main_only_with_include_docs), + ?TDEF_FE(t_style_all_docs), + ?TDEF_FE(t_style_all_docs_with_include_docs) + ] + } } }. -filter_by_selector() -> +filter_by_design() -> { - "Filter _selector", + "Filter _design", { foreach, fun setup/0, fun teardown/1, - [ - fun should_select_basic/1, - fun should_select_with_since/1, - fun should_select_when_no_result/1, - fun should_select_with_deleted_docs/1, - fun should_select_with_continuous/1, - fun should_stop_selector_when_db_deleted/1, - fun should_select_with_empty_fields/1, - fun should_select_with_fields/1 - ] + [?TDEF_FE(t_emit_only_design_documents)] } }. -filter_by_design() -> +filter_by_doc_id() -> { - "Filter _design", + "Filter _doc_id", { foreach, fun setup/0, fun teardown/1, [ - fun should_emit_only_design_documents/1 + ?TDEF_FE(t_filter_by_specific_doc_ids), + ?TDEF_FE(t_filter_by_specific_doc_ids_descending), + ?TDEF_FE(t_filter_by_specific_doc_ids_no_result), + ?TDEF_FE(t_filter_by_specific_doc_ids_with_since), + ?TDEF_FE(t_handle_deleted_docs) ] } }. -%% filter_by_custom_function() -> -%% { -%% "Filter function", -%% { -%% foreach, -%% fun setup/0, fun teardown/1, -%% [ -%% fun should_receive_heartbeats/1 -%% ] -%% } -%% }. - filter_by_filter_function() -> { "Filter by filters", @@ -153,702 +156,477 @@ filter_by_filter_function() -> fun setup/0, fun teardown/1, [ - fun should_filter_by_doc_attribute/1, - fun should_filter_by_user_ctx/1 + ?TDEF_FE(t_filter_by_doc_attribute), + ?TDEF_FE(t_filter_by_user_ctx) ] } }. -filter_by_view() -> +filter_by_selector() -> { - "Filter _view", + "Filter _selector", { foreach, fun setup/0, fun teardown/1, [ - fun should_filter_by_view/1, - fun should_filter_by_erlang_view/1 + ?TDEF_FE(t_select_basic), + ?TDEF_FE(t_select_with_since), + ?TDEF_FE(t_select_when_no_result), + ?TDEF_FE(t_select_with_deleted_docs), + ?TDEF_FE(t_select_with_continuous), + ?TDEF_FE(t_stop_selector_when_db_deleted), + ?TDEF_FE(t_select_with_empty_fields), + ?TDEF_FE(t_select_with_fields) ] } }. -continuous_feed() -> +filter_by_view() -> { - "Continuous Feed", + "Filter _view", { foreach, fun setup/0, fun teardown/1, [ - fun should_filter_continuous_feed_by_specific_doc_ids/1, - fun should_end_changes_when_db_deleted/1 + ?TDEF_FE(t_filter_by_view), + ?TDEF_FE(t_filter_by_erlang_view) ] } }. -should_filter_by_specific_doc_ids({DbName, _}) -> - ?_test( - begin - ChArgs = #changes_args{ - filter = "_doc_ids" - }, - DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], - Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, - {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), - - ?assertEqual(2, length(Rows)), - [#row{seq = Seq1, id = Id1}, #row{seq = Seq2, id = Id2}] = Rows, - ?assertEqual(<<"doc4">>, Id1), - ?assertEqual(4, Seq1), - ?assertEqual(<<"doc3">>, Id2), - ?assertEqual(6, Seq2), - ?assertEqual(UpSeq, LastSeq) - end - ). - -should_filter_by_specific_doc_ids_descending({DbName, _}) -> - ?_test( - begin - ChArgs = #changes_args{ - filter = "_doc_ids", - dir = rev - }, - DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], - Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, - {Rows, LastSeq, _} = run_changes_query(DbName, ChArgs, Req), - - ?assertEqual(2, length(Rows)), - [#row{seq = Seq1, id = Id1}, #row{seq = Seq2, id = Id2}] = Rows, - ?assertEqual(<<"doc3">>, Id1), - ?assertEqual(6, Seq1), - ?assertEqual(<<"doc4">>, Id2), - ?assertEqual(4, Seq2), - ?assertEqual(4, LastSeq) - end - ). - -should_filter_by_specific_doc_ids_with_since({DbName, _}) -> - ?_test( - begin - ChArgs = #changes_args{ - filter = "_doc_ids", - since = 5 - }, - DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], - Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, - {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), - - ?assertEqual(1, length(Rows)), - [#row{seq = Seq1, id = Id1}] = Rows, - ?assertEqual(<<"doc3">>, Id1), - ?assertEqual(6, Seq1), - ?assertEqual(UpSeq, LastSeq) - end - ). - -should_filter_by_specific_doc_ids_no_result({DbName, _}) -> - ?_test( - begin - ChArgs = #changes_args{ - filter = "_doc_ids", - since = 6 - }, - DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], - Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, - {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), - - ?assertEqual(0, length(Rows)), - ?assertEqual(UpSeq, LastSeq) - end - ). - -should_handle_deleted_docs({DbName, Revs}) -> - ?_test( - begin - Rev3_2 = element(6, Revs), - {ok, Db} = couch_db:open_int(DbName, []), - {ok, _} = save_doc( - Db, - {[ - {<<"_id">>, <<"doc3">>}, - {<<"_deleted">>, true}, - {<<"_rev">>, Rev3_2} - ]} - ), - - ChArgs = #changes_args{ - filter = "_doc_ids", - since = 9 - }, - DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], - Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, - {Rows, LastSeq, _} = run_changes_query(DbName, ChArgs, Req), - - ?assertEqual(1, length(Rows)), - ?assertMatch( - [#row{seq = LastSeq, id = <<"doc3">>, deleted = true}], - Rows - ), - ?assertEqual(11, LastSeq) - end - ). - -should_filter_continuous_feed_by_specific_doc_ids({DbName, Revs}) -> - ?_test( - begin - {ok, Db} = couch_db:open_int(DbName, []), - ChangesArgs = #changes_args{ - filter = "_doc_ids", - feed = "continuous" - }, - DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], - Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, - reset_row_notifications(), - Consumer = spawn_consumer(DbName, ChangesArgs, Req), - ?assertEqual(ok, wait_row_notifications(2)), - ok = pause(Consumer), - - Rows = get_rows(Consumer), - ?assertEqual(2, length(Rows)), - [#row{seq = Seq1, id = Id1}, #row{seq = Seq2, id = Id2}] = Rows, - ?assertEqual(<<"doc4">>, Id1), - ?assertEqual(4, Seq1), - ?assertEqual(<<"doc3">>, Id2), - ?assertEqual(6, Seq2), - - clear_rows(Consumer), - {ok, _Rev9} = save_doc(Db, {[{<<"_id">>, <<"doc9">>}]}), - {ok, _Rev10} = save_doc(Db, {[{<<"_id">>, <<"doc10">>}]}), - ok = unpause(Consumer), - timer:sleep(100), - ok = pause(Consumer), - ?assertEqual([], get_rows(Consumer)), - - Rev4 = element(4, Revs), - Rev3_2 = element(6, Revs), - {ok, Rev4_2} = save_doc( - Db, - {[ - {<<"_id">>, <<"doc4">>}, - {<<"_rev">>, Rev4} - ]} - ), - {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc11">>}]}), - {ok, _} = save_doc( - Db, - {[ - {<<"_id">>, <<"doc4">>}, - {<<"_rev">>, Rev4_2} - ]} - ), - {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc12">>}]}), - {ok, Rev3_3} = save_doc( - Db, - {[ - {<<"_id">>, <<"doc3">>}, - {<<"_rev">>, Rev3_2} - ]} - ), - reset_row_notifications(), - ok = unpause(Consumer), - ?assertEqual(ok, wait_row_notifications(2)), - ok = pause(Consumer), - - NewRows = get_rows(Consumer), - ?assertEqual(2, length(NewRows)), - [Row14, Row16] = NewRows, - ?assertEqual(<<"doc4">>, Row14#row.id), - ?assertEqual(15, Row14#row.seq), - ?assertEqual(<<"doc3">>, Row16#row.id), - ?assertEqual(17, Row16#row.seq), - - clear_rows(Consumer), - {ok, _Rev3_4} = save_doc( - Db, - {[ - {<<"_id">>, <<"doc3">>}, - {<<"_rev">>, Rev3_3} - ]} - ), - reset_row_notifications(), - ok = unpause(Consumer), - ?assertEqual(ok, wait_row_notifications(1)), - ok = pause(Consumer), - - FinalRows = get_rows(Consumer), - - ok = unpause(Consumer), - stop_consumer(Consumer), - - ?assertMatch([#row{seq = 18, id = <<"doc3">>}], FinalRows) - end - ). - -should_end_changes_when_db_deleted({DbName, _Revs}) -> - ?_test(begin - {ok, _Db} = couch_db:open_int(DbName, []), - ChangesArgs = #changes_args{ - filter = "_doc_ids", - feed = "continuous" - }, - DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], - Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, - Consumer = spawn_consumer(DbName, ChangesArgs, Req), - ok = pause(Consumer), - ok = couch_server:delete(DbName, [?ADMIN_CTX]), - ok = unpause(Consumer), - {_Rows, _LastSeq} = wait_finished(Consumer), - stop_consumer(Consumer), - ok - end). - -should_select_basic({DbName, _}) -> - ?_test( - begin - ChArgs = #changes_args{filter = "_selector"}, - Selector = {[{<<"_id">>, <<"doc3">>}]}, - Req = {json_req, {[{<<"selector">>, Selector}]}}, - {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), - ?assertEqual(1, length(Rows)), - [#row{seq = Seq, id = Id}] = Rows, - ?assertEqual(<<"doc3">>, Id), - ?assertEqual(6, Seq), - ?assertEqual(UpSeq, LastSeq) - end - ). - -should_select_with_since({DbName, _}) -> - ?_test( - begin - ChArgs = #changes_args{filter = "_selector", since = 9}, - GteDoc2 = {[{<<"$gte">>, <<"doc1">>}]}, - Selector = {[{<<"_id">>, GteDoc2}]}, - Req = {json_req, {[{<<"selector">>, Selector}]}}, - {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), - ?assertEqual(1, length(Rows)), - [#row{seq = Seq, id = Id}] = Rows, - ?assertEqual(<<"doc8">>, Id), - ?assertEqual(10, Seq), - ?assertEqual(UpSeq, LastSeq) - end - ). - -should_select_when_no_result({DbName, _}) -> - ?_test( - begin - ChArgs = #changes_args{filter = "_selector"}, - Selector = {[{<<"_id">>, <<"nopers">>}]}, - Req = {json_req, {[{<<"selector">>, Selector}]}}, - {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), - ?assertEqual(0, length(Rows)), - ?assertEqual(UpSeq, LastSeq) - end - ). - -should_select_with_deleted_docs({DbName, Revs}) -> - ?_test( - begin - Rev3_2 = element(6, Revs), - {ok, Db} = couch_db:open_int(DbName, []), - {ok, _} = save_doc( - Db, - {[ - {<<"_id">>, <<"doc3">>}, - {<<"_deleted">>, true}, - {<<"_rev">>, Rev3_2} - ]} - ), - ChArgs = #changes_args{filter = "_selector"}, - Selector = {[{<<"_id">>, <<"doc3">>}]}, - Req = {json_req, {[{<<"selector">>, Selector}]}}, - {Rows, LastSeq, _} = run_changes_query(DbName, ChArgs, Req), - ?assertMatch( - [#row{seq = LastSeq, id = <<"doc3">>, deleted = true}], - Rows - ), - ?assertEqual(11, LastSeq) - end - ). +t_filter_continuous_feed_by_specific_doc_ids({DbName, Revs}) -> + {ok, Db} = couch_db:open_int(DbName, []), + ChArgs = #changes_args{feed = "continuous", filter = "_doc_ids"}, + DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], + Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, + reset_row_notifications(), + Consumer = spawn_consumer(DbName, ChArgs, Req), + ?assertEqual(ok, wait_row_notifications(2)), + ok = pause(Consumer), + + Rows = get_rows(Consumer), + ?assertEqual(2, length(Rows)), + ?assertEqual([#row{seq = 4, id = <<"doc4">>}, #row{seq = 6, id = <<"doc3">>}], Rows), + + clear_rows(Consumer), + {ok, _Rev9} = save_doc(Db, {[{<<"_id">>, <<"doc9">>}]}), + {ok, _Rev10} = save_doc(Db, {[{<<"_id">>, <<"doc10">>}]}), + ok = unpause(Consumer), + timer:sleep(100), + ok = pause(Consumer), + ?assertEqual([], get_rows(Consumer)), + + Rev4 = element(4, Revs), + Rev3_2 = element(6, Revs), + {ok, Rev4_2} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}, {<<"_rev">>, Rev4}]}), + {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc11">>}]}), + {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}, {<<"_rev">>, Rev4_2}]}), + {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc12">>}]}), + {ok, Rev3_3} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}, {<<"_rev">>, Rev3_2}]}), + reset_row_notifications(), + ok = unpause(Consumer), + ?assertEqual(ok, wait_row_notifications(2)), + ok = pause(Consumer), + + NewRows = get_rows(Consumer), + ?assertEqual(2, length(NewRows)), + ?assertEqual([#row{seq = 15, id = <<"doc4">>}, #row{seq = 17, id = <<"doc3">>}], NewRows), + + clear_rows(Consumer), + {ok, _Rev3_4} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}, {<<"_rev">>, Rev3_3}]}), + reset_row_notifications(), + ok = unpause(Consumer), + ?assertEqual(ok, wait_row_notifications(1)), + ok = pause(Consumer), + + FinalRows = get_rows(Consumer), + ok = unpause(Consumer), + stop_consumer(Consumer), + couch_db:close(Db), + ?assertEqual([#row{seq = 18, id = <<"doc3">>}], FinalRows). + +t_end_changes_when_db_deleted({DbName, _Revs}) -> + {ok, _Db} = couch_db:open_int(DbName, []), + ChArgs = #changes_args{feed = "continuous", filter = "_doc_ids"}, + DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], + Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, + Consumer = spawn_consumer(DbName, ChArgs, Req), + ok = pause(Consumer), + ok = delete_db(DbName), + ok = unpause(Consumer), + {_Rows, _LastSeq} = wait_finished(Consumer), + ok = stop_consumer(Consumer). + +t_emit_only_design_documents({DbName, Revs}) -> + ChArgs = #changes_args{filter = "_design"}, + Req = {json_req, null}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows)), + ?assertEqual(UpSeq, LastSeq), + ?assertEqual([#row{seq = 8, id = <<"_design/foo">>}], Rows), -should_select_with_continuous({DbName, Revs}) -> - ?_test( - begin - {ok, Db} = couch_db:open_int(DbName, []), - ChArgs = #changes_args{filter = "_selector", feed = "continuous"}, - GteDoc8 = {[{<<"$gte">>, <<"doc8">>}]}, - Selector = {[{<<"_id">>, GteDoc8}]}, - Req = {json_req, {[{<<"selector">>, Selector}]}}, - reset_row_notifications(), - Consumer = spawn_consumer(DbName, ChArgs, Req), - ?assertEqual(ok, wait_row_notifications(1)), - ok = pause(Consumer), - Rows = get_rows(Consumer), - ?assertMatch( - [#row{seq = 10, id = <<"doc8">>, deleted = false}], - Rows - ), - clear_rows(Consumer), - {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc01">>}]}), - ok = unpause(Consumer), - timer:sleep(100), - ok = pause(Consumer), - ?assertEqual([], get_rows(Consumer)), - Rev4 = element(4, Revs), - Rev8 = element(10, Revs), - {ok, _} = save_doc( - Db, - {[ - {<<"_id">>, <<"doc8">>}, - {<<"_rev">>, Rev8} - ]} - ), - {ok, _} = save_doc( - Db, + {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), + {ok, _} = save_doc( + Db, + {[ + {<<"_id">>, <<"_design/foo">>}, + {<<"_rev">>, element(8, Revs)}, + {<<"_deleted">>, true} + ]} + ), + couch_db:close(Db), + {Rows2, LastSeq2, UpSeq2} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows2)), + ?assertEqual(UpSeq2, LastSeq2), + ?assertEqual([#row{seq = 11, id = <<"_design/foo">>, deleted = true}], Rows2). + +t_filter_by_specific_doc_ids({DbName, _}) -> + ChArgs = #changes_args{filter = "_doc_ids"}, + DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], + Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(2, length(Rows)), + ?assertEqual([#row{seq = 4, id = <<"doc4">>}, #row{seq = 6, id = <<"doc3">>}], Rows), + ?assertEqual(UpSeq, LastSeq). + +t_filter_by_specific_doc_ids_descending({DbName, _}) -> + ChArgs = #changes_args{filter = "_doc_ids", dir = rev}, + DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], + Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, + {Rows, LastSeq, _} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(2, length(Rows)), + ?assertEqual([#row{seq = 6, id = <<"doc3">>}, #row{seq = 4, id = <<"doc4">>}], Rows), + ?assertEqual(4, LastSeq). + +t_filter_by_specific_doc_ids_no_result({DbName, _}) -> + ChArgs = #changes_args{filter = "_doc_ids", since = 6}, + DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], + Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(0, length(Rows)), + ?assertEqual(UpSeq, LastSeq). + +t_filter_by_specific_doc_ids_with_since({DbName, _}) -> + ChArgs = #changes_args{filter = "_doc_ids", since = 5}, + DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], + Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows)), + ?assertEqual([#row{seq = 6, id = <<"doc3">>}], Rows), + ?assertEqual(UpSeq, LastSeq). + +t_handle_deleted_docs({DbName, Revs}) -> + Rev3_2 = element(6, Revs), + {ok, Db} = couch_db:open_int(DbName, []), + {ok, _} = save_doc( + Db, + {[ + {<<"_id">>, <<"doc3">>}, + {<<"_rev">>, Rev3_2}, + {<<"_deleted">>, true} + ]} + ), + ChArgs = #changes_args{filter = "_doc_ids", since = 9}, + DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], + Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, + {Rows, LastSeq, _} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows)), + ?assertEqual([#row{seq = LastSeq, id = <<"doc3">>, deleted = true}], Rows), + ?assertEqual(11, LastSeq). + +t_filter_by_doc_attribute({DbName, _}) -> + DDocId = <<"_design/app">>, + DDoc = couch_doc:from_json_obj( + {[ + {<<"_id">>, DDocId}, + {<<"language">>, <<"javascript">>}, + {<<"filters">>, {[ - {<<"_id">>, <<"doc4">>}, - {<<"_rev">>, Rev4} - ]} - ), - reset_row_notifications(), - ok = unpause(Consumer), - ?assertEqual(ok, wait_row_notifications(1)), - ok = pause(Consumer), - NewRows = get_rows(Consumer), - ?assertMatch( - [#row{seq = _, id = <<"doc8">>, deleted = false}], - NewRows - ) - end - ). - -should_stop_selector_when_db_deleted({DbName, _Revs}) -> - ?_test( - begin - {ok, _Db} = couch_db:open_int(DbName, []), - ChArgs = #changes_args{filter = "_selector", feed = "continuous"}, - Selector = {[{<<"_id">>, <<"doc3">>}]}, - Req = {json_req, {[{<<"selector">>, Selector}]}}, - Consumer = spawn_consumer(DbName, ChArgs, Req), - ok = pause(Consumer), - ok = couch_server:delete(DbName, [?ADMIN_CTX]), - ok = unpause(Consumer), - {_Rows, _LastSeq} = wait_finished(Consumer), - stop_consumer(Consumer), - ok - end - ). - -should_select_with_empty_fields({DbName, _}) -> - ?_test( - begin - ChArgs = #changes_args{filter = "_selector", include_docs = true}, - Selector = {[{<<"_id">>, <<"doc3">>}]}, - Req = - {json_req, - {[ - {<<"selector">>, Selector}, - {<<"fields">>, []} - ]}}, - {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), - ?assertEqual(1, length(Rows)), - [#row{seq = Seq, id = Id, doc = Doc}] = Rows, - ?assertEqual(<<"doc3">>, Id), - ?assertEqual(6, Seq), - ?assertEqual(UpSeq, LastSeq), - ?assertMatch({[{_K1, _V1}, {_K2, _V2}]}, Doc) - end - ). - -should_select_with_fields({DbName, _}) -> - ?_test( - begin - ChArgs = #changes_args{filter = "_selector", include_docs = true}, - Selector = {[{<<"_id">>, <<"doc3">>}]}, - Req = - {json_req, - {[ - {<<"selector">>, Selector}, - {<<"fields">>, [<<"_id">>, <<"nope">>]} - ]}}, - {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), - ?assertEqual(1, length(Rows)), - [#row{seq = Seq, id = Id, doc = Doc}] = Rows, - ?assertEqual(<<"doc3">>, Id), - ?assertEqual(6, Seq), - ?assertEqual(UpSeq, LastSeq), - ?assertMatch(Doc, {[{<<"_id">>, <<"doc3">>}]}) - end - ). - -should_emit_only_design_documents({DbName, Revs}) -> - ?_test( - begin - ChArgs = #changes_args{ - filter = "_design" - }, - Req = {json_req, null}, - {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), - - ?assertEqual(1, length(Rows)), - ?assertEqual(UpSeq, LastSeq), - ?assertEqual([#row{seq = 8, id = <<"_design/foo">>}], Rows), - - {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), - {ok, _} = save_doc( - Db, + {<<"valid">>, << + "function(doc, req) {" + " if (doc._id == 'doc3') {" + " return true; " + "} }" + >>} + ]}} + ]} + ), + ChArgs = #changes_args{filter = "app/valid"}, + Req = {json_req, null}, + ok = update_ddoc(DbName, DDoc), + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows)), + ?assertEqual([#row{seq = 6, id = <<"doc3">>}], Rows), + ?assertEqual(UpSeq, LastSeq). + +t_filter_by_user_ctx({DbName, _}) -> + DDocId = <<"_design/app">>, + DDoc = couch_doc:from_json_obj( + {[ + {<<"_id">>, DDocId}, + {<<"language">>, <<"javascript">>}, + {<<"filters">>, {[ - {<<"_id">>, <<"_design/foo">>}, - {<<"_rev">>, element(8, Revs)}, - {<<"_deleted">>, true} - ]} - ), - - couch_db:close(Db), - {Rows2, LastSeq2, _} = run_changes_query(DbName, ChArgs, Req), - - UpSeq2 = UpSeq + 1, - - ?assertEqual(1, length(Rows2)), - ?assertEqual(UpSeq2, LastSeq2), - ?assertEqual( - [ - #row{ - seq = 11, - id = <<"_design/foo">>, - deleted = true - } - ], - Rows2 - ) - end - ). - -%% should_receive_heartbeats(_) -> -%% {timeout, ?TEST_TIMEOUT div 1000, -%% ?_test( -%% begin -%% DbName = ?tempdb(), -%% Timeout = 100, -%% {ok, Db} = create_db(DbName), - -%% {ok, _} = save_doc(Db, {[ -%% {<<"_id">>, <<"_design/filtered">>}, -%% {<<"language">>, <<"javascript">>}, -%% {<<"filters">>, {[ -%% {<<"foo">>, <<"function(doc) { -%% return ['doc10', 'doc11', 'doc12'].indexOf(doc._id) != -1;}">> -%% }]}} -%% ]}), - -%% ChangesArgs = #changes_args{ -%% filter = "filtered/foo", -%% feed = "continuous", -%% timeout = 10000, -%% heartbeat = 1000 -%% }, -%% Consumer = spawn_consumer(DbName, ChangesArgs, {json_req, null}), - -%% {ok, _Rev1} = save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}), -%% timer:sleep(Timeout), -%% {ok, _Rev2} = save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}), -%% timer:sleep(Timeout), -%% {ok, _Rev3} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}), -%% timer:sleep(Timeout), -%% {ok, _Rev4} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}), -%% timer:sleep(Timeout), -%% {ok, _Rev5} = save_doc(Db, {[{<<"_id">>, <<"doc5">>}]}), -%% timer:sleep(Timeout), -%% {ok, _Rev6} = save_doc(Db, {[{<<"_id">>, <<"doc6">>}]}), -%% timer:sleep(Timeout), -%% {ok, _Rev7} = save_doc(Db, {[{<<"_id">>, <<"doc7">>}]}), -%% timer:sleep(Timeout), -%% {ok, _Rev8} = save_doc(Db, {[{<<"_id">>, <<"doc8">>}]}), -%% timer:sleep(Timeout), -%% {ok, _Rev9} = save_doc(Db, {[{<<"_id">>, <<"doc9">>}]}), - -%% Heartbeats = get_heartbeats(Consumer), -%% ?assert(Heartbeats > 0), - -%% {ok, _Rev10} = save_doc(Db, {[{<<"_id">>, <<"doc10">>}]}), -%% timer:sleep(Timeout), -%% {ok, _Rev11} = save_doc(Db, {[{<<"_id">>, <<"doc11">>}]}), -%% timer:sleep(Timeout), -%% {ok, _Rev12} = save_doc(Db, {[{<<"_id">>, <<"doc12">>}]}), - -%% Heartbeats2 = get_heartbeats(Consumer), -%% ?assert(Heartbeats2 > Heartbeats), - -%% Rows = get_rows(Consumer), -%% ?assertEqual(3, length(Rows)), - -%% {ok, _Rev13} = save_doc(Db, {[{<<"_id">>, <<"doc13">>}]}), -%% timer:sleep(Timeout), -%% {ok, _Rev14} = save_doc(Db, {[{<<"_id">>, <<"doc14">>}]}), -%% timer:sleep(Timeout), - -%% Heartbeats3 = get_heartbeats(Consumer), -%% ?assert(Heartbeats3 > Heartbeats2) -%% end)}. - -should_filter_by_doc_attribute({DbName, _}) -> - ?_test( - begin - DDocId = <<"_design/app">>, - DDoc = couch_doc:from_json_obj( + {<<"valid">>, << + "function(doc, req) {" + " if (req.userCtx.name == doc._id) {" + " return true; " + "} }" + >>} + ]}} + ]} + ), + ChArgs = #changes_args{filter = "app/valid"}, + UserCtx = #user_ctx{name = <<"doc3">>, roles = []}, + {ok, DbRec} = couch_db:clustered_db(DbName, UserCtx), + Req = {json_req, {[{<<"userCtx">>, couch_util:json_user_ctx(DbRec)}]}}, + ok = update_ddoc(DbName, DDoc), + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows)), + ?assertEqual([#row{seq = 6, id = <<"doc3">>}], Rows), + ?assertEqual(UpSeq, LastSeq). + +t_select_basic({DbName, _}) -> + ChArgs = #changes_args{filter = "_selector"}, + Selector = {[{<<"_id">>, <<"doc3">>}]}, + Req = {json_req, {[{<<"selector">>, Selector}]}}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows)), + ?assertEqual([#row{seq = 6, id = <<"doc3">>}], Rows), + ?assertEqual(UpSeq, LastSeq). + +t_select_with_since({DbName, _}) -> + ChArgs = #changes_args{filter = "_selector", since = 9}, + GteDoc2 = {[{<<"$gte">>, <<"doc1">>}]}, + Selector = {[{<<"_id">>, GteDoc2}]}, + Req = {json_req, {[{<<"selector">>, Selector}]}}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows)), + ?assertEqual([#row{seq = 10, id = <<"doc8">>}], Rows), + ?assertEqual(UpSeq, LastSeq). + +t_select_when_no_result({DbName, _}) -> + ChArgs = #changes_args{filter = "_selector"}, + Selector = {[{<<"_id">>, <<"nopers">>}]}, + Req = {json_req, {[{<<"selector">>, Selector}]}}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(0, length(Rows)), + ?assertEqual(UpSeq, LastSeq). + +t_select_with_deleted_docs({DbName, Revs}) -> + Rev3_2 = element(6, Revs), + {ok, Db} = couch_db:open_int(DbName, []), + {ok, _} = save_doc( + Db, + {[ + {<<"_id">>, <<"doc3">>}, + {<<"_deleted">>, true}, + {<<"_rev">>, Rev3_2} + ]} + ), + ChArgs = #changes_args{filter = "_selector"}, + Selector = {[{<<"_id">>, <<"doc3">>}]}, + Req = {json_req, {[{<<"selector">>, Selector}]}}, + {Rows, LastSeq, _} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual([#row{seq = LastSeq, id = <<"doc3">>, deleted = true}], Rows), + ?assertEqual(11, LastSeq). + +t_select_with_continuous({DbName, Revs}) -> + {ok, Db} = couch_db:open_int(DbName, []), + ChArgs = #changes_args{filter = "_selector", feed = "continuous"}, + GteDoc8 = {[{<<"$gte">>, <<"doc8">>}]}, + Selector = {[{<<"_id">>, GteDoc8}]}, + Req = {json_req, {[{<<"selector">>, Selector}]}}, + reset_row_notifications(), + Consumer = spawn_consumer(DbName, ChArgs, Req), + ?assertEqual(ok, wait_row_notifications(1)), + ok = pause(Consumer), + Rows = get_rows(Consumer), + ?assertEqual([#row{seq = 10, id = <<"doc8">>, deleted = false}], Rows), + clear_rows(Consumer), + {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc01">>}]}), + ok = unpause(Consumer), + timer:sleep(100), + ok = pause(Consumer), + ?assertEqual([], get_rows(Consumer)), + Rev4 = element(4, Revs), + Rev8 = element(10, Revs), + {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc8">>}, {<<"_rev">>, Rev8}]}), + {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}, {<<"_rev">>, Rev4}]}), + reset_row_notifications(), + ok = unpause(Consumer), + ?assertEqual(ok, wait_row_notifications(1)), + ok = pause(Consumer), + NewRows = get_rows(Consumer), + ?debugVal(NewRows), + ?assertMatch([#row{seq = _, id = <<"doc8">>, deleted = false}], NewRows), + ?assertEqual([#row{seq = 12, id = <<"doc8">>, deleted = false}], NewRows). + +t_stop_selector_when_db_deleted({DbName, _Revs}) -> + {ok, _Db} = couch_db:open_int(DbName, []), + ChArgs = #changes_args{filter = "_selector", feed = "continuous"}, + Selector = {[{<<"_id">>, <<"doc3">>}]}, + Req = {json_req, {[{<<"selector">>, Selector}]}}, + Consumer = spawn_consumer(DbName, ChArgs, Req), + ok = pause(Consumer), + ok = delete_db(DbName), + ok = unpause(Consumer), + {_Rows, _LastSeq} = wait_finished(Consumer), + ok = stop_consumer(Consumer). + +t_select_with_empty_fields({DbName, Revs}) -> + ChArgs = #changes_args{filter = "_selector", include_docs = true}, + Selector = {[{<<"_id">>, <<"doc3">>}]}, + Req = {json_req, {[{<<"selector">>, Selector}, {<<"fields">>, []}]}}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows)), + Rev3_2 = element(6, Revs), + Doc = {[{<<"_id">>, <<"doc3">>}, {<<"_rev">>, Rev3_2}]}, + ?assertEqual([#row{seq = 6, id = <<"doc3">>, doc = Doc}], Rows), + ?assertEqual(UpSeq, LastSeq). + +t_select_with_fields({DbName, _}) -> + ChArgs = #changes_args{filter = "_selector", include_docs = true}, + Selector = {[{<<"_id">>, <<"doc3">>}]}, + Req = {json_req, {[{<<"selector">>, Selector}, {<<"fields">>, [<<"_id">>, <<"nope">>]}]}}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows)), + ?assertEqual([#row{seq = 6, id = <<"doc3">>, doc = {[{<<"_id">>, <<"doc3">>}]}}], Rows), + ?assertEqual(UpSeq, LastSeq). + +t_filter_by_view({DbName, _}) -> + DDocId = <<"_design/app">>, + DDoc = couch_doc:from_json_obj( + {[ + {<<"_id">>, DDocId}, + {<<"language">>, <<"javascript">>}, + {<<"views">>, {[ - {<<"_id">>, DDocId}, - {<<"language">>, <<"javascript">>}, - {<<"filters">>, + {<<"valid">>, {[ - {<<"valid">>, << - "function(doc, req) {" + {<<"map">>, << + "function(doc) {" " if (doc._id == 'doc3') {" - " return true; " + " emit(doc); " "} }" >>} ]}} - ]} - ), - ChArgs = #changes_args{filter = "app/valid"}, - Req = {json_req, null}, - ok = update_ddoc(DbName, DDoc), - {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), - ?assertEqual(1, length(Rows)), - [#row{seq = Seq, id = Id}] = Rows, - ?assertEqual(<<"doc3">>, Id), - ?assertEqual(6, Seq), - ?assertEqual(UpSeq, LastSeq) - end - ). - -should_filter_by_user_ctx({DbName, _}) -> - ?_test( - begin - DDocId = <<"_design/app">>, - DDoc = couch_doc:from_json_obj( + ]}} + ]} + ), + ChArgs = #changes_args{filter = "_view"}, + Req = {json_req, {[{<<"query">>, {[{<<"view">>, <<"app/valid">>}]}}]}}, + ok = update_ddoc(DbName, DDoc), + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows)), + ?assertEqual([#row{seq = 6, id = <<"doc3">>}], Rows), + ?assertEqual(UpSeq, LastSeq). + +t_filter_by_erlang_view({DbName, _}) -> + DDocId = <<"_design/app">>, + DDoc = couch_doc:from_json_obj( + {[ + {<<"_id">>, DDocId}, + {<<"language">>, <<"erlang">>}, + {<<"views">>, {[ - {<<"_id">>, DDocId}, - {<<"language">>, <<"javascript">>}, - {<<"filters">>, + {<<"valid">>, {[ - {<<"valid">>, << - "function(doc, req) {" - " if (req.userCtx.name == doc._id) {" - " return true; " - "} }" + {<<"map">>, << + "fun({Doc}) ->" + " case lists:keyfind(<<\"_id\">>, 1, Doc) of" + " {<<\"_id\">>, <<\"doc3\">>} -> Emit(Doc, null);" + " false -> ok" + " end " + "end." >>} ]}} - ]} - ), - ChArgs = #changes_args{filter = "app/valid"}, - UserCtx = #user_ctx{name = <<"doc3">>, roles = []}, - {ok, DbRec} = couch_db:clustered_db(DbName, UserCtx), - Req = - {json_req, - {[ - { - <<"userCtx">>, couch_util:json_user_ctx(DbRec) - } - ]}}, - ok = update_ddoc(DbName, DDoc), - {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), - ?assertEqual(1, length(Rows)), - [#row{seq = Seq, id = Id}] = Rows, - ?assertEqual(<<"doc3">>, Id), - ?assertEqual(6, Seq), - ?assertEqual(UpSeq, LastSeq) - end + ]}} + ]} + ), + ChArgs = #changes_args{filter = "_view"}, + Req = {json_req, {[{<<"query">>, {[{<<"view">>, <<"app/valid">>}]}}]}}, + ok = update_ddoc(DbName, DDoc), + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows)), + ?assertEqual([#row{seq = 6, id = <<"doc3">>}], Rows), + ?assertEqual(UpSeq, LastSeq). + +t_style_main_only({DbName, _}) -> + ChArgs = #changes_args{style = main_only}, + Req = {json_req, null}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(9, length(Rows)), + ?assertEqual(UpSeq, LastSeq), + ?assertEqual( + [ + #row{seq = 1, id = <<"doc1">>}, + #row{seq = 2, id = <<"doc2">>}, + #row{seq = 4, id = <<"doc4">>}, + #row{seq = 5, id = <<"doc5">>}, + #row{seq = 6, id = <<"doc3">>}, + #row{seq = 7, id = <<"doc6">>}, + #row{seq = 8, id = <<"_design/foo">>}, + #row{seq = 9, id = <<"doc7">>}, + #row{seq = 10, id = <<"doc8">>} + ], + Rows ). -should_filter_by_view({DbName, _}) -> - ?_test( - begin - DDocId = <<"_design/app">>, - DDoc = couch_doc:from_json_obj( - {[ - {<<"_id">>, DDocId}, - {<<"language">>, <<"javascript">>}, - {<<"views">>, - {[ - {<<"valid">>, - {[ - {<<"map">>, << - "function(doc) {" - " if (doc._id == 'doc3') {" - " emit(doc); " - "} }" - >>} - ]}} - ]}} - ]} - ), - ChArgs = #changes_args{filter = "_view"}, - Req = - {json_req, - {[ - { - <<"query">>, - {[ - {<<"view">>, <<"app/valid">>} - ]} - } - ]}}, - ok = update_ddoc(DbName, DDoc), - {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), - ?assertEqual(1, length(Rows)), - [#row{seq = Seq, id = Id}] = Rows, - ?assertEqual(<<"doc3">>, Id), - ?assertEqual(6, Seq), - ?assertEqual(UpSeq, LastSeq) - end +t_style_main_only_with_include_docs({DbName, Revs}) -> + ChArgs = #changes_args{style = main_only, include_docs = true}, + Req = {json_req, null}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(9, length(Rows)), + ?assertEqual(UpSeq, LastSeq), + ?assertEqual( + #row{ + seq = 1, + id = <<"doc1">>, + doc = {[{<<"_id">>, <<"doc1">>}, {<<"_rev">>, element(1, Revs)}]} + }, + hd(Rows) ). -should_filter_by_erlang_view({DbName, _}) -> - ?_test( - begin - DDocId = <<"_design/app">>, - DDoc = couch_doc:from_json_obj( - {[ - {<<"_id">>, DDocId}, - {<<"language">>, <<"erlang">>}, - {<<"views">>, - {[ - {<<"valid">>, - {[ - {<<"map">>, << - "fun({Doc}) ->" - " case lists:keyfind(<<\"_id\">>, 1, Doc) of" - " {<<\"_id\">>, <<\"doc3\">>} -> Emit(Doc, null); " - " false -> ok" - " end " - "end." - >>} - ]}} - ]}} - ]} - ), - ChArgs = #changes_args{filter = "_view"}, - Req = - {json_req, - {[ - { - <<"query">>, - {[ - {<<"view">>, <<"app/valid">>} - ]} - } - ]}}, - ok = update_ddoc(DbName, DDoc), - {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), - ?assertEqual(1, length(Rows)), - [#row{seq = Seq, id = Id}] = Rows, - ?assertEqual(<<"doc3">>, Id), - ?assertEqual(6, Seq), - ?assertEqual(UpSeq, LastSeq) - end +t_style_all_docs({DbName, _}) -> + ChArgs = #changes_args{style = all_docs}, + Req = {json_req, null}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(9, length(Rows)), + ?assertEqual(UpSeq, LastSeq), + ?assertEqual( + [ + #row{seq = 1, id = <<"doc1">>}, + #row{seq = 2, id = <<"doc2">>}, + #row{seq = 4, id = <<"doc4">>}, + #row{seq = 5, id = <<"doc5">>}, + #row{seq = 6, id = <<"doc3">>}, + #row{seq = 7, id = <<"doc6">>}, + #row{seq = 8, id = <<"_design/foo">>}, + #row{seq = 9, id = <<"doc7">>}, + #row{seq = 10, id = <<"doc8">>} + ], + Rows ). +t_style_all_docs_with_include_docs({DbName, Revs}) -> + ChArgs = #changes_args{style = all_docs, include_docs = true}, + Req = {json_req, null}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(9, length(Rows)), + ?assertEqual(UpSeq, LastSeq), + ?assertEqual( + #row{ + seq = 1, + id = <<"doc1">>, + doc = {[{<<"_id">>, <<"doc1">>}, {<<"_rev">>, element(1, Revs)}]} + }, + hd(Rows) + ). + +%%%%%%%%%%%%%%%%%%%% Utility Functions %%%%%%%%%%%%%%%%%%%% update_ddoc(DbName, DDoc) -> {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), {ok, _} = couch_db:update_doc(Db, DDoc, []), @@ -881,18 +659,6 @@ get_rows({Consumer, _}) -> ?assertNotEqual(timeout, Resp), Resp. -%% get_heartbeats({Consumer, _}) -> -%% Ref = make_ref(), -%% Consumer ! {get_heartbeats, Ref}, -%% Resp = receive -%% {hearthbeats, Ref, HeartBeats} -> -%% HeartBeats -%% after ?TIMEOUT -> -%% timeout -%% end, -%% ?assertNotEqual(timeout, Resp), -%% Resp. - clear_rows({Consumer, _}) -> Ref = make_ref(), Consumer ! {reset, Ref}, @@ -991,28 +757,29 @@ spawn_consumer(DbName, ChangesArgs0, Req) -> Parent = self(), spawn_monitor(fun() -> put(heartbeat_count, 0), - Callback = fun - ({change, {Change}, _}, _, Acc) -> - Id = couch_util:get_value(<<"id">>, Change), - Seq = couch_util:get_value(<<"seq">>, Change), - Del = couch_util:get_value(<<"deleted">>, Change, false), - Doc = couch_util:get_value(doc, Change, nil), - Parent ! row, - [#row{id = Id, seq = Seq, deleted = Del, doc = Doc} | Acc]; - ({stop, LastSeq}, _, Acc) -> - Parent ! {consumer_finished, lists:reverse(Acc), LastSeq}, - stop_loop(Parent, Acc); - (timeout, _, Acc) -> - put(heartbeat_count, get(heartbeat_count) + 1), - maybe_pause(Parent, Acc); - (_, _, Acc) -> - maybe_pause(Parent, Acc) - end, + Callback = + fun + ({change, {Change}, _}, _, Acc) -> + Id = couch_util:get_value(<<"id">>, Change), + Seq = couch_util:get_value(<<"seq">>, Change), + Del = couch_util:get_value(<<"deleted">>, Change, false), + Doc = couch_util:get_value(doc, Change, nil), + Parent ! row, + [#row{id = Id, seq = Seq, deleted = Del, doc = Doc} | Acc]; + ({stop, LastSeq}, _, Acc) -> + Parent ! {consumer_finished, lists:reverse(Acc), LastSeq}, + stop_loop(Parent, Acc); + (timeout, _, Acc) -> + put(heartbeat_count, get(heartbeat_count) + 1), + maybe_pause(Parent, Acc); + (_, _, Acc) -> + maybe_pause(Parent, Acc) + end, {ok, Db} = couch_db:open_int(DbName, []), ChangesArgs = case - (ChangesArgs0#changes_args.timeout =:= undefined) andalso - (ChangesArgs0#changes_args.heartbeat =:= undefined) + ChangesArgs0#changes_args.timeout =:= undefined andalso + ChangesArgs0#changes_args.heartbeat =:= undefined of true -> ChangesArgs0#changes_args{timeout = 1000, heartbeat = 100}; diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index d01f1f5a7..42544ac40 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -535,6 +535,32 @@ changes_enumerator(#full_doc_info{} = FDI, Acc) -> changes_enumerator(#doc_info{id = <<"_local/", _/binary>>, high_seq = Seq}, Acc) -> {ok, Acc#fabric_changes_acc{seq = Seq, pending = Acc#fabric_changes_acc.pending - 1}}; changes_enumerator(DocInfo, Acc) -> + #fabric_changes_acc{ + db = Db, + args = #changes_args{ + include_docs = IncludeDocs, + filter_fun = Filter + }, + pending = Pending + } = Acc, + #doc_info{high_seq = Seq} = DocInfo, + RevsOrDocRevs = couch_changes:filter(Db, DocInfo, Filter, IncludeDocs), + % include_docs = false, call `filter/3`, use `couch_changes:open_revs/3` to read docs. + % include_docs = true, call `filter/4`, it will return [revs] or {docs, revs}. + % - {}: use `couch_changes:open_revs/3` to open docs + % - []: use `fabric_rpc:doc_member/3` to open docs + ChangesRow = + case is_tuple(RevsOrDocRevs) of + true -> + {Docs, Revs} = RevsOrDocRevs, + get_changes_row(Revs, Acc, DocInfo, fun get_json_docs/3, Docs); + false -> + get_changes_row(RevsOrDocRevs, Acc, DocInfo, fun doc_member/3, {Db, DocInfo}) + end, + ok = rexi:stream2(ChangesRow), + {ok, Acc#fabric_changes_acc{seq = Seq, pending = Pending - 1}}. + +get_changes_row(Revs, Acc, DocInfo, Fun, Args) -> #fabric_changes_acc{ db = Db, args = #changes_args{ @@ -547,36 +573,35 @@ changes_enumerator(DocInfo, Acc) -> epochs = Epochs } = Acc, #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DocInfo, - case [X || X <- couch_changes:filter(Db, DocInfo, Filter), X /= null] of + case [X || X <- Revs, X /= null] of [] -> - ChangesRow = - {no_pass, [ - {pending, Pending - 1}, - {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}} - ]}; + {no_pass, [ + {pending, Pending - 1}, + {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}} + ]}; Results -> Opts = if Conflicts -> [conflicts | DocOptions]; true -> DocOptions end, - ChangesRow = - {change, [ - {pending, Pending - 1}, - {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}}, - {id, Id}, - {changes, Results}, - {deleted, Del} - | if - IncludeDocs -> [doc_member(Db, DocInfo, Opts, Filter)]; - true -> [] - end - ]} - end, - ok = rexi:stream2(ChangesRow), - {ok, Acc#fabric_changes_acc{seq = Seq, pending = Pending - 1}}. + {change, [ + {pending, Pending - 1}, + {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}}, + {id, Id}, + {changes, Results}, + {deleted, Del} + | if + IncludeDocs -> [Fun(Args, Opts, Filter)]; + true -> [] + end + ]} + end. + +get_json_docs([Doc], Opts, Filter) -> + {doc, maybe_filtered_json_doc(Doc, Opts, Filter)}. -doc_member(Shard, DocInfo, Opts, Filter) -> +doc_member({Shard, DocInfo}, Opts, Filter) -> case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of {ok, Doc} -> {doc, maybe_filtered_json_doc(Doc, Opts, Filter)};
