This is an automated email from the ASF dual-hosted git repository. iilyak pushed a commit to branch couch-stats-resource-tracker-v3-rebase-http-2 in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit ff8da9c60dff57fce5ab86b929b42139da87cfc5 Author: ILYA Khlopotov <[email protected]> AuthorDate: Wed Jul 2 09:28:58 2025 -0700 fixup! Add csrt_httpd_tests.erl --- src/couch_stats/test/eunit/csrt_httpd_tests.erl | 570 ++++++++++++++---------- 1 file changed, 338 insertions(+), 232 deletions(-) diff --git a/src/couch_stats/test/eunit/csrt_httpd_tests.erl b/src/couch_stats/test/eunit/csrt_httpd_tests.erl index 9dd2c1139..5b32f564f 100644 --- a/src/couch_stats/test/eunit/csrt_httpd_tests.erl +++ b/src/couch_stats/test/eunit/csrt_httpd_tests.erl @@ -14,19 +14,7 @@ -include_lib("stdlib/include/ms_transform.hrl"). --import( - csrt_test_helper, - [ - rctx_gen/0, - rctx_gen/1, - rctxs/0, - jrctx/1 - ] -). - --include_lib("couch/include/couch_db.hrl"). -include_lib("couch/include/couch_eunit.hrl"). --include_lib("couch_mrview/include/couch_mrview.hrl"). -include("../../src/couch_stats_resource_tracker.hrl"). -define(USER, ?MODULE_STRING ++ "_admin"). @@ -37,26 +25,24 @@ -define(JSON_CT, {"Content-Type", ?JSON}). -define(ACCEPT_JSON, {"Accept", ?JSON}). -%% Use different values than default configs to ensure they're picked up --define(THRESHOLD_DBNAME_IO, 91). --define(THRESHOLD_DOCS_READ, 123). --define(THRESHOLD_DOCS_WRITTEN, 12). --define(THRESHOLD_IOQ_CALLS, 439). --define(THRESHOLD_ROWS_READ, 43). --define(THRESHOLD_CHANGES, 79). --define(THRESHOLD_LONG_REQS, 432). - --define(TEST_QUERY_LIMIT, 98). - csrt_httpd_test_() -> { foreach, fun setup/0, fun teardown/1, [ - ?TDEF_FE(t_query_group_by), - ?TDEF_FE(t_query_count_by), - ?TDEF_FE(t_query_sort_by) + ?TDEF_FE(t_query_group_by_multiple_keys), + ?TDEF_FE(t_query_group_by_single_key), + ?TDEF_FE(t_query_group_by_binary_key), + ?TDEF_FE(t_query_group_by_bad_request), + ?TDEF_FE(t_query_count_by_multiple_keys), + ?TDEF_FE(t_query_count_by_single_key), + ?TDEF_FE(t_query_count_by_binary_key), + ?TDEF_FE(t_query_count_by_bad_request), + ?TDEF_FE(t_query_sort_by_multiple_keys), + ?TDEF_FE(t_query_sort_by_single_key), + ?TDEF_FE(t_query_sort_by_binary_key), + ?TDEF_FE(t_query_sort_by_bad_request) ] }. @@ -66,310 +52,430 @@ setup_ctx() -> HashedList = binary_to_list(Hashed), ok = config:set("admins", ?USER, HashedList, false), Addr = config:get("chttpd", "bind_address", "127.0.0.1"), - DbName = binary_to_list(?tempdb()), Port = mochiweb_socket_server:get(chttpd, port), Url = lists:concat(["http://", Addr, ":", Port, "/"]), - {Ctx, Url, DbName}. + {Ctx, Url}. + +setup() -> + {Ctx, Url} = setup_ctx(), + Rctxs = [ + rctx(#{dbname => <<"db1">>, ioq_calls => 123, username => <<"user_foo">>}), + rctx(#{dbname => <<"db1">>, ioq_calls => 321, username => <<"user_foo">>}), + rctx(#{dbname => <<"db2">>, ioq_calls => 345, username => <<"user_bar">>}), + rctx(#{dbname => <<"db2">>, ioq_calls => 543, username => <<"user_bar">>}), + rctx(#{dbname => <<"db1">>, ioq_calls => 678, username => <<"user_bar">>}), + rctx(#{dbname => <<"db2">>, ioq_calls => 987, username => <<"user_foo">>}) + ], + ets:insert(?CSRT_ETS, Rctxs), + #{ctx => Ctx, url => Url, rctxs => Rctxs}. -teardown(#{dbname := DbName, ctx := Ctx}) -> - meck:unload(ioq), - ok = fabric:delete_db(DbName, [?ADMIN_CTX]), +teardown(#{ctx := Ctx}) -> Persist = false, ok = config:delete("admins", ?USER, Persist), test_util:stop_couch(Ctx). -create_db(Top, Db) -> - case req(put, Top ++ Db) of - {201, #{}} -> ok; - Error -> error({failed_to_create_test_db, Db, Error}) - end. - -req(Method, Url) -> - Headers = [?JSON_CT, ?AUTH, ?ACCEPT_JSON], - {ok, Code, _, Res} = test_request:request(Method, Url, Headers), - {Code, json_decode(Res)}. - -req(Method, Url, #{} = Body) -> - req(Method, Url, jiffy:encode(Body)); -req(Method, Url, Body) -> - Headers = [?JSON_CT, ?AUTH, ?ACCEPT_JSON], - {ok, Code, _, Res} = test_request:request(Method, Url, Headers, Body), - {Code, json_decode(Res)}. - -json_decode(Bin) when is_binary(Bin) -> - jiffy:decode(Bin, [return_maps]). - -setup() -> - {Ctx, Url, DbName} = setup_ctx(), - configure(), - ok = create_db(Url, DbName), - create_docs(DbName), - PidRef = mock_all_docs_req(DbName), - MArgs = #mrargs{include_docs = false}, - _Res = fabric:all_docs(DbName, [?ADMIN_CTX], fun view_cb/2, [], MArgs), - Rctx = load_rctx(PidRef), - Rctxs = rctxs(), - #{ctx => Ctx, dbname => DbName, rctx => Rctx, rctxs => Rctxs, url => Url}. - - -create_docs(DbName) -> - Docs = make_docs(100), - Opts = [], - {ok, _} = fabric:update_docs(DbName, Docs, Opts), - ok. - -configure() -> - config:set_boolean(?CSRT, "randomize_testing", false, false), - config:set_boolean(?CSRT, "enable_reporting", true, false), - config:set_boolean(?CSRT, "enable_rpc_reporting", true, false), +active_resources_group_by(Url, AggregationKeys, CounterKey) -> + active_resources_group_by("docs_read", Url, AggregationKeys, CounterKey). - ok = meck:new(ioq, [passthrough]), - ok = meck:expect(ioq, bypass, fun(_, _) -> false end), +active_resources_group_by(MatcherName, Url, AggregationKeys, CounterKey) -> + Body = #{ + <<"group_by">> => #{ + <<"aggregate_keys">> => AggregationKeys, + <<"counter_key">> => CounterKey + } + }, + active_resources(Url, MatcherName, Body). - ok = config:set( - "csrt_logger.matchers_threshold", "docs_read", integer_to_list(?THRESHOLD_DOCS_READ), false - ), - ok = config:set( - "csrt_logger.matchers_threshold", - "docs_written", - integer_to_list(?THRESHOLD_DOCS_WRITTEN), - false - ), - ok = config:set( - "csrt_logger.matchers_threshold", "ioq_calls", integer_to_list(?THRESHOLD_IOQ_CALLS), false - ), - ok = config:set( - "csrt_logger.matchers_threshold", "rows_read", integer_to_list(?THRESHOLD_ROWS_READ), false - ), - ok = config:set( - "csrt_logger.matchers_threshold", - "changes_processed", - integer_to_list(?THRESHOLD_CHANGES), - false - ), - ok = config:set( - "csrt_logger.matchers_threshold", "long_reqs", integer_to_list(?THRESHOLD_LONG_REQS), false - ), - ok = config:set("csrt_logger.dbnames_io", "foo", integer_to_list(?THRESHOLD_DBNAME_IO), false), - ok = config:set("csrt_logger.dbnames_io", "bar", integer_to_list(?THRESHOLD_DBNAME_IO), false), - ok = config:set( - "csrt_logger.dbnames_io", "foo/bar", integer_to_list(?THRESHOLD_DBNAME_IO), false +t_query_group_by_multiple_keys(#{rctxs := Rctxs, url := Url}) -> + Aggregated = aggregate([username, dbname], ioq_calls, Rctxs), + Grouped = group(Aggregated), + {RC, Result} = active_resources_group_by(Url, [<<"username">>, <<"dbname">>], <<"ioq_calls">>), + ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Result])), + ?assert(is_list(Result), format("Expected list of entries, got ~p~n", [Result])), + ?assertEqual(4, length(Result), format("Expected four entries, got ~p~n ~p~n", [length(Result), Result])), + ?assertMatch([ + #{<<"key">> := #{<<"username">> := _, <<"dbname">> := _}, <<"value">> := _}, + #{<<"key">> := #{<<"username">> := _, <<"dbname">> := _}, <<"value">> := _}, + #{<<"key">> := #{<<"username">> := _, <<"dbname">> := _}, <<"value">> := _}, + #{<<"key">> := #{<<"username">> := _, <<"dbname">> := _}, <<"value">> := _} + ], Result, "Unexpected shape of the result"), + OrderedByKey = order_by_key([username, dbname], Result), + V1 = maps:get({<<"user_bar">>, <<"db1">>}, Grouped), + V2 = maps:get({<<"user_bar">>, <<"db2">>}, Grouped), + V3 = maps:get({<<"user_foo">>, <<"db1">>}, Grouped), + V4 = maps:get({<<"user_foo">>, <<"db2">>}, Grouped), + ?assertMatch( + [ + #{<<"key">> := #{<<"username">> := <<"user_bar">>, <<"dbname">> := <<"db1">>}, <<"value">> := V1}, + #{<<"key">> := #{<<"username">> := <<"user_bar">>, <<"dbname">> := <<"db2">>}, <<"value">> := V2}, + #{<<"key">> := #{<<"username">> := <<"user_foo">>, <<"dbname">> := <<"db1">>}, <<"value">> := V3}, + #{<<"key">> := #{<<"username">> := <<"user_foo">>, <<"dbname">> := <<"db2">>}, <<"value">> := V4} + ], + OrderedByKey ), - ok = config:set(?CSRT, "query_limit", integer_to_list(?TEST_QUERY_LIMIT)), - csrt_logger:reload_matchers(), ok. -%% we cannot use normal http request, because the `chttpd` calls -%% `csrt:destroy_context()` and we would remove entries from `ets`. -mock_all_docs_req(DbName) -> - Method = 'GET', - Path = "/" ++ DbName ++ "/_all_docs", - Nonce = couch_util:to_hex(crypto:strong_rand_bytes(5)), - Req = #httpd{method = Method, nonce = Nonce}, - {_, _} = PidRef = csrt:create_coordinator_context(Req, Path), - csrt:set_context_username(<<"user_foo">>), - csrt:set_context_dbname(?l2b(DbName)), - PidRef. - -load_rctx(PidRef) -> - %% Add slight delay to accumulate RPC response deltas - timer:sleep(50), - csrt:get_resource(PidRef). - -make_docs(Count) -> - lists:map( - fun(I) -> - #doc{ - id = ?l2b("foo_" ++ integer_to_list(I)), - body = {[{<<"value">>, I}]} - } - end, - lists:seq(1, Count) - ). - -view_cb({row, Row}, Acc) -> - {ok, [Row | Acc]}; -view_cb(_Msg, Acc) -> - {ok, Acc}. - -active_resources(Url, MatchName, Body) -> - req(post, Url ++ "/_active_resources/_match/" ++ MatchName, Body). - - -t_query_group_by(#{rctx := Rctx, dbname := DbName, url := Url}) -> - DbNameBin = ?l2b(DbName), - IoqCalls = Rctx#rctx.ioq_calls, - Req = fun(AggregationKeys, CounterKey) -> - #{ - <<"group_by">> => #{ - <<"aggregate_keys">> => AggregationKeys, - <<"counter_key">> => CounterKey - } - } - end, +t_query_group_by_single_key(#{rctxs := Rctxs, url := Url}) -> + Aggregated = aggregate([username], ioq_calls, Rctxs), + Grouped = group(Aggregated), + {RC, Result} = active_resources_group_by(Url, [<<"username">>], <<"ioq_calls">>), + ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Result])), + ?assert(is_list(Result), format("Expected list of entries, got ~p~n", [Result])), + ?assertEqual(2, length(Result), format("Expected two entries, got ~p~n ~p~n", [length(Result), Result])), + ?assertMatch([ + #{<<"key">> := #{<<"username">> := _}, <<"value">> := _}, + #{<<"key">> := #{<<"username">> := _}, <<"value">> := _} + ], Result, "Unexpected shape of the result"), + OrderedByKey = order_by_key([username], Result), + V1 = maps:get({<<"user_bar">>}, Grouped), + V2 = maps:get({<<"user_foo">>}, Grouped), ?assertMatch( - {200, [#{ - <<"key">> := #{<<"dbname">> := DbNameBin, <<"username">> := <<"user_foo">>}, - <<"value">> := IoqCalls}]}, - active_resources(Url, "rows_read", Req([<<"username">>, <<"dbname">>], <<"ioq_calls">>)), - "Should handle 'AggregationKeys :: [binary(), ...]'" - ), - ?assertMatch( - {200, [#{ - <<"key">> := #{<<"username">> := <<"user_foo">>}, - <<"value">> := IoqCalls}]}, - active_resources(Url, "rows_read", Req([<<"username">>], <<"ioq_calls">>)), - "Should handle 'AggregationKeys :: [binary()]'" + [ + #{<<"key">> := #{<<"username">> := <<"user_bar">>}, <<"value">> := V1}, + #{<<"key">> := #{<<"username">> := <<"user_foo">>}, <<"value">> := V2} + ], + OrderedByKey ), + ok. + +t_query_group_by_binary_key(#{rctxs := Rctxs, url := Url}) -> + Aggregated = aggregate([username], ioq_calls, Rctxs), + Grouped = group(Aggregated), + {RC, Result} = active_resources_group_by(Url, <<"username">>, <<"ioq_calls">>), + ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Result])), + ?assert(is_list(Result), format("Expected list of entries, got ~p~n", [Result])), + ?assertEqual(2, length(Result), format("Expected two entries, got ~p~n ~p~n", [length(Result), Result])), + ?assertMatch([ + #{<<"key">> := #{<<"username">> := _}, <<"value">> := _}, + #{<<"key">> := #{<<"username">> := _}, <<"value">> := _} + ], Result, format("Unexpected shape of the result~n ~p~n", [Result])), + OrderedByKey = order_by_key([username], Result), + V1 = maps:get({<<"user_bar">>}, Grouped), + V2 = maps:get({<<"user_foo">>}, Grouped), ?assertMatch( - {200, [#{ - <<"key">> := #{<<"username">> := <<"user_foo">>}, - <<"value">> := IoqCalls}]}, - active_resources(Url, "rows_read", Req(<<"username">>, <<"ioq_calls">>)), - "Should handle 'AggregationKeys :: binary()'" + [ + #{<<"key">> := #{<<"username">> := <<"user_bar">>}, <<"value">> := V1}, + #{<<"key">> := #{<<"username">> := <<"user_foo">>}, <<"value">> := V2} + ], + OrderedByKey ), + ok. + +t_query_group_by_bad_request(#{url := Url}) -> ?assertMatch( {400, #{<<"error">> := <<"bad_request">>, <<"reason">> := <<"Unknown matcher 'unknown_matcher'">>}}, - active_resources(Url, "unknown_matcher", Req([<<"username">>], <<"ioq_calls">>)), + active_resources_group_by("unknown_matcher", Url, <<"username">>, <<"ioq_calls">>), "Should return error if 'matcher' is unknown" ), ?assertMatch( {400, #{<<"error">> := <<"bad_request">>, <<"reason">> := <<"Unknown field name 'unknown_field'">>}}, - active_resources(Url, "rows_read", Req([<<"unknown_field">>], <<"ioq_calls">>)), + active_resources_group_by(Url, [<<"unknown_field">>], <<"ioq_calls">>), "Should return error if 'AggregationKeys' contain unknown field" ), ?assertMatch( {400, #{<<"error">> := <<"bad_request">>, <<"reason">> := <<"Unknown field name 'unknown_field'">>}}, - active_resources(Url, "rows_read", Req(<<"unknown_field">>, <<"ioq_calls">>)), + active_resources_group_by(Url, <<"unknown_field">>, <<"ioq_calls">>), "Should return error if 'AggregationKeys' is unknown field" ), ?assertMatch( {400, #{<<"error">> := <<"bad_request">>, <<"reason">> := <<"Unknown field name 'unknown_field'">>}}, - active_resources(Url, "rows_read", Req(<<"username">>, <<"unknown_field">>)), + active_resources_group_by(Url, <<"username">>, <<"unknown_field">>), "Should return error if 'ValueKey' contain unknown field" ), ok. -t_query_count_by(#{dbname := DbName, url := Url}) -> - DbNameBin = ?l2b(DbName), - IoqCount = 1, - Req = fun(AggregationKeys) -> - #{ - <<"count_by">> => #{ - <<"aggregate_keys">> => AggregationKeys - } +active_resources_count_by(Url, AggregationKeys) -> + active_resources_count_by("docs_read", Url, AggregationKeys). + +active_resources_count_by(MatcherName, Url, AggregationKeys) -> + Body = #{ + <<"count_by">> => #{ + <<"aggregate_keys">> => AggregationKeys } - end, + }, + active_resources(Url, MatcherName, Body). + +t_query_count_by_multiple_keys(#{rctxs := Rctxs, url := Url}) -> + Aggregated = aggregate([username, dbname], ioq_calls, Rctxs), + Grouped = count(Aggregated), + {RC, Result} = active_resources_count_by(Url, [<<"username">>, <<"dbname">>]), + ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Result])), + ?assert(is_list(Result), format("Expected list of entries, got ~p~n", [Result])), + ?assertEqual(4, length(Result), format("Expected four entries, got ~p~n ~p~n", [length(Result), Result])), + ?assertMatch([ + #{<<"key">> := #{<<"username">> := _, <<"dbname">> := _}, <<"value">> := _}, + #{<<"key">> := #{<<"username">> := _, <<"dbname">> := _}, <<"value">> := _}, + #{<<"key">> := #{<<"username">> := _, <<"dbname">> := _}, <<"value">> := _}, + #{<<"key">> := #{<<"username">> := _, <<"dbname">> := _}, <<"value">> := _} + ], Result, "Unexpected shape of the result"), + OrderedByKey = order_by_key([username, dbname], Result), + V1 = maps:get({<<"user_bar">>, <<"db1">>}, Grouped), + V2 = maps:get({<<"user_bar">>, <<"db2">>}, Grouped), + V3 = maps:get({<<"user_foo">>, <<"db1">>}, Grouped), + V4 = maps:get({<<"user_foo">>, <<"db2">>}, Grouped), ?assertMatch( - {200, [#{ - <<"key">> := #{<<"dbname">> := DbNameBin, <<"username">> := <<"user_foo">>}, - <<"value">> := IoqCount}]}, - active_resources(Url, "rows_read", Req([<<"username">>, <<"dbname">>])), - "Should handle 'AggregationKeys :: [binary(), ...]'" + [ + #{<<"key">> := #{<<"username">> := <<"user_bar">>, <<"dbname">> := <<"db1">>}, <<"value">> := V1}, + #{<<"key">> := #{<<"username">> := <<"user_bar">>, <<"dbname">> := <<"db2">>}, <<"value">> := V2}, + #{<<"key">> := #{<<"username">> := <<"user_foo">>, <<"dbname">> := <<"db1">>}, <<"value">> := V3}, + #{<<"key">> := #{<<"username">> := <<"user_foo">>, <<"dbname">> := <<"db2">>}, <<"value">> := V4} + ], + OrderedByKey ), + ok. + +t_query_count_by_single_key(#{rctxs := Rctxs, url := Url}) -> + Aggregated = aggregate([username], ioq_calls, Rctxs), + Grouped = count(Aggregated), + {RC, Result} = active_resources_count_by(Url, [<<"username">>]), + ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Result])), + ?assert(is_list(Result), format("Expected list of entries, got ~p~n", [Result])), + ?assertEqual(2, length(Result), format("Expected two entries, got ~p~n ~p~n", [length(Result), Result])), + ?assertMatch([ + #{<<"key">> := #{<<"username">> := _}, <<"value">> := _}, + #{<<"key">> := #{<<"username">> := _}, <<"value">> := _} + ], Result, "Unexpected shape of the result"), + OrderedByKey = order_by_key([username], Result), + V1 = maps:get({<<"user_bar">>}, Grouped), + V2 = maps:get({<<"user_foo">>}, Grouped), ?assertMatch( - {200, [#{ - <<"key">> := #{<<"username">> := <<"user_foo">>}, - <<"value">> := IoqCount}]}, - active_resources(Url, "rows_read", Req([<<"username">>])), - "Should handle 'AggregationKeys :: [binary()]'" + [ + #{<<"key">> := #{<<"username">> := <<"user_bar">>}, <<"value">> := V1}, + #{<<"key">> := #{<<"username">> := <<"user_foo">>}, <<"value">> := V2} + ], + OrderedByKey ), + ok. + +t_query_count_by_binary_key(#{rctxs := Rctxs, url := Url}) -> + Aggregated = aggregate([username], ioq_calls, Rctxs), + Grouped = count(Aggregated), + {RC, Result} = active_resources_count_by(Url, <<"username">>), + ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Result])), + ?assert(is_list(Result), format("Expected list of entries, got ~p~n", [Result])), + ?assertEqual(2, length(Result), format("Expected two entries, got ~p~n ~p~n", [length(Result), Result])), + ?assertMatch([ + #{<<"key">> := #{<<"username">> := _}, <<"value">> := _}, + #{<<"key">> := #{<<"username">> := _}, <<"value">> := _} + ], Result, "Unexpected shape of the result"), + OrderedByKey = order_by_key([username], Result), + V1 = maps:get({<<"user_bar">>}, Grouped), + V2 = maps:get({<<"user_foo">>}, Grouped), ?assertMatch( - {200, [#{ - <<"key">> := #{<<"username">> := <<"user_foo">>}, - <<"value">> := IoqCount}]}, - active_resources(Url, "rows_read", Req(<<"username">>)), - "Should handle 'AggregationKeys :: binary()'" + [ + #{<<"key">> := #{<<"username">> := <<"user_bar">>}, <<"value">> := V1}, + #{<<"key">> := #{<<"username">> := <<"user_foo">>}, <<"value">> := V2} + ], + OrderedByKey ), + ok. + +t_query_count_by_bad_request(#{url := Url}) -> ?assertMatch( {400, #{<<"error">> := <<"bad_request">>, <<"reason">> := <<"Unknown matcher 'unknown_matcher'">>}}, - active_resources(Url, "unknown_matcher", Req([<<"username">>])), + active_resources_count_by("unknown_matcher", Url, <<"username">>), "Should return error if 'matcher' is unknown" ), ?assertMatch( {400, #{<<"error">> := <<"bad_request">>, <<"reason">> := <<"Unknown field name 'unknown_field'">>}}, - active_resources(Url, "rows_read", Req([<<"unknown_field">>])), + active_resources_count_by(Url, [<<"unknown_field">>]), "Should return error if 'AggregationKeys' contain unknown field" ), ?assertMatch( {400, #{<<"error">> := <<"bad_request">>, <<"reason">> := <<"Unknown field name 'unknown_field'">>}}, - active_resources(Url, "rows_read", Req(<<"unknown_field">>)), + active_resources_count_by(Url, <<"unknown_field">>), "Should return error if 'AggregationKeys' is unknown field" ), ok. -t_query_sort_by(#{rctx := Rctx, dbname := DbName, url := Url}) -> - DbNameBin = ?l2b(DbName), - IoqCalls = Rctx#rctx.ioq_calls, - Req = fun(AggregationKeys, CounterKey) -> - #{ - <<"sort_by">> => #{ - <<"aggregate_keys">> => AggregationKeys, - <<"counter_key">> => CounterKey - } +active_resources_sort_by(Url, AggregationKeys, CounterKey) -> + active_resources_sort_by("docs_read", Url, AggregationKeys, CounterKey). + +active_resources_sort_by(MatcherName, Url, AggregationKeys, CounterKey) -> + Body = #{ + <<"sort_by">> => #{ + <<"aggregate_keys">> => AggregationKeys, + <<"counter_key">> => CounterKey } - end, + }, + active_resources(Url, MatcherName, Body). + +t_query_sort_by_multiple_keys(#{rctxs := Rctxs, url := Url}) -> + Aggregated = aggregate([username, dbname], ioq_calls, Rctxs), + Grouped = group(Aggregated), + Ordered = order_by_value(Grouped), + {RC, Result} = active_resources_sort_by(Url, [<<"username">>, <<"dbname">>], <<"ioq_calls">>), + ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Result])), + ?assert(is_list(Result), format("Expected list of entries, got ~p~n", [Result])), + ?assertEqual(4, length(Result), format("Expected four entries, got ~p~n ~p~n", [length(Result), Result])), + ?assertMatch([ + #{<<"key">> := #{<<"username">> := _, <<"dbname">> := _}, <<"value">> := _}, + #{<<"key">> := #{<<"username">> := _, <<"dbname">> := _}, <<"value">> := _}, + #{<<"key">> := #{<<"username">> := _, <<"dbname">> := _}, <<"value">> := _}, + #{<<"key">> := #{<<"username">> := _, <<"dbname">> := _}, <<"value">> := _} + ], Result, "Unexpected shape of the result"), + [ + {{<<"user_foo">>, <<"db2">>}, V1}, + {{<<"user_bar">>, <<"db2">>}, V2}, + {{<<"user_bar">>, <<"db1">>}, V3}, + {{<<"user_foo">>, <<"db1">>}, V4} + ] = Ordered, ?assertMatch( - {200, [#{ - <<"key">> := #{<<"dbname">> := DbNameBin, <<"username">> := <<"user_foo">>}, - <<"value">> := IoqCalls}]}, - active_resources(Url, "rows_read", Req([<<"username">>, <<"dbname">>], <<"ioq_calls">>)), - "Should handle 'AggregationKeys :: [binary(), ...]'" + [ + #{<<"key">> := #{<<"username">> := <<"user_foo">>, <<"dbname">> := <<"db2">>}, <<"value">> := V1}, + #{<<"key">> := #{<<"username">> := <<"user_bar">>, <<"dbname">> := <<"db2">>}, <<"value">> := V2}, + #{<<"key">> := #{<<"username">> := <<"user_bar">>, <<"dbname">> := <<"db1">>}, <<"value">> := V3}, + #{<<"key">> := #{<<"username">> := <<"user_foo">>, <<"dbname">> := <<"db1">>}, <<"value">> := V4} + ], + Result ), + ok. + +t_query_sort_by_single_key(#{rctxs := Rctxs, url := Url}) -> + Aggregated = aggregate([username], ioq_calls, Rctxs), + Grouped = group(Aggregated), + Ordered = order_by_value(Grouped), + {RC, Result} = active_resources_sort_by(Url, [<<"username">>], <<"ioq_calls">>), + ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Result])), + ?assert(is_list(Result), format("Expected list of entries, got ~p~n", [Result])), + ?assertEqual(2, length(Result), format("Expected two entries, got ~p~n ~p~n", [length(Result), Result])), + ?assertMatch([ + #{<<"key">> := #{<<"username">> := _}, <<"value">> := _}, + #{<<"key">> := #{<<"username">> := _}, <<"value">> := _} + ], Result, "Unexpected shape of the result"), + [ + {{<<"user_bar">>}, V1}, + {{<<"user_foo">>}, V2} + ] = Ordered, ?assertMatch( - {200, [#{ - <<"key">> := #{<<"username">> := <<"user_foo">>}, - <<"value">> := IoqCalls}]}, - active_resources(Url, "rows_read", Req([<<"username">>], <<"ioq_calls">>)), - "Should handle 'AggregationKeys :: [binary()]'" + [ + #{<<"key">> := #{<<"username">> := <<"user_bar">>}, <<"value">> := V1}, + #{<<"key">> := #{<<"username">> := <<"user_foo">>}, <<"value">> := V2} + ], + Result ), + ok. + +t_query_sort_by_binary_key(#{rctxs := Rctxs, url := Url}) -> + Aggregated = aggregate([username], ioq_calls, Rctxs), + Grouped = group(Aggregated), + Ordered = order_by_value(Grouped), + {RC, Result} = active_resources_sort_by(Url, <<"username">>, <<"ioq_calls">>), + ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Result])), + ?assert(is_list(Result), format("Expected list of entries, got ~p~n", [Result])), + ?assertEqual(2, length(Result), format("Expected two entries, got ~p~n ~p~n", [length(Result), Result])), + ?assertMatch([ + #{<<"key">> := #{<<"username">> := _}, <<"value">> := _}, + #{<<"key">> := #{<<"username">> := _}, <<"value">> := _} + ], Result, "Unexpected shape of the result"), + [ + {{<<"user_bar">>}, V1}, + {{<<"user_foo">>}, V2} + ] = Ordered, ?assertMatch( - {200, [#{ - <<"key">> := #{<<"username">> := <<"user_foo">>}, - <<"value">> := IoqCalls}]}, - active_resources(Url, "rows_read", Req(<<"username">>, <<"ioq_calls">>)), - "Should handle 'AggregationKeys :: binary()'" + [ + #{<<"key">> := #{<<"username">> := <<"user_bar">>}, <<"value">> := V1}, + #{<<"key">> := #{<<"username">> := <<"user_foo">>}, <<"value">> := V2} + ], + Result ), + ok. + +t_query_sort_by_bad_request(#{url := Url}) -> ?assertMatch( {400, #{<<"error">> := <<"bad_request">>, <<"reason">> := <<"Unknown matcher 'unknown_matcher'">>}}, - active_resources(Url, "unknown_matcher", Req([<<"username">>], <<"ioq_calls">>)), + active_resources_sort_by("unknown_matcher", Url, <<"username">>, <<"ioq_calls">>), "Should return error if 'matcher' is unknown" ), ?assertMatch( {400, #{<<"error">> := <<"bad_request">>, <<"reason">> := <<"Unknown field name 'unknown_field'">>}}, - active_resources(Url, "rows_read", Req([<<"unknown_field">>], <<"ioq_calls">>)), + active_resources_sort_by(Url, [<<"unknown_field">>], <<"ioq_calls">>), "Should return error if 'AggregationKeys' contain unknown field" ), ?assertMatch( {400, #{<<"error">> := <<"bad_request">>, <<"reason">> := <<"Unknown field name 'unknown_field'">>}}, - active_resources(Url, "rows_read", Req(<<"unknown_field">>, <<"ioq_calls">>)), + active_resources_sort_by(Url, <<"unknown_field">>, <<"ioq_calls">>), "Should return error if 'AggregationKeys' is unknown field" ), ?assertMatch( {400, #{<<"error">> := <<"bad_request">>, <<"reason">> := <<"Unknown field name 'unknown_field'">>}}, - active_resources(Url, "rows_read", Req(<<"username">>, <<"unknown_field">>)), + active_resources_sort_by(Url, <<"username">>, <<"unknown_field">>), "Should return error if 'ValueKey' contain unknown field" ), ok. + +format(Fmt, Args) -> + lists:flatten(io_lib:format(Fmt, Args)). + +aggregate(AggregationKeys, ValField, Records) -> + lists:foldl(fun(Rctx, Acc) -> + Key = list_to_tuple([csrt_entry:value(Field, Rctx) || Field <- AggregationKeys]), + CurrVal = maps:get(Key, Acc, []), + maps:put(Key, [csrt_entry:value(ValField, Rctx) | CurrVal], Acc) + end, #{}, Records). + +group(Aggregated) -> + maps:fold(fun(Key, Val, Acc) -> + maps:put(Key, lists:foldl(fun erlang:'+'/2, 0, Val), Acc) + end, #{}, Aggregated). + +count(Aggregated) -> + maps:fold(fun(Key, Val, Acc) -> + maps:put(Key, lists:foldl(fun(_, A) -> A + 1 end, 0, Val), Acc) + end, #{}, Aggregated). + +order_by_value(Grouped) -> + lists:reverse(lists:keysort(2, maps:to_list(Grouped))). + +% This function handles both representations of entries of the result +% #{<<"key">> => #{<<"dbname">> => <<"db2">>, <<"username">> => <<"user_foo">>}, <<"value">> => 1} +% and +% {{<<"db2">>, <<"user_foo">>}, 1} +order_by_key(AggregationKeys, Entries) when is_list(AggregationKeys) andalso is_list(Entries) -> + lists:sort(fun(A, B) -> + get_key(AggregationKeys, A) =< get_key(AggregationKeys, B) + end, Entries). + +% This function handles both representations of entries of the result +% #{<<"key">> => #{<<"dbname">> => <<"db2">>, <<"username">> => <<"user_foo">>}, <<"value">> => 1} +% and +% {{<<"db2">>, <<"user_foo">>}, 1} +get_key(AggregationKeys, #{<<"key">> := Key}) -> + list_to_tuple([maps:get(atom_to_binary(Field), Key) || Field <- AggregationKeys]); +get_key(_AggregationKeys, {Key, _}) -> + Key. + +active_resources(Url, MatchName, Body) -> + EndpointUrl = Url ++ "/_active_resources/_match/" ++ MatchName, + Headers = [?JSON_CT, ?AUTH, ?ACCEPT_JSON], + {ok, Code, _, Res} = test_request:request(post, EndpointUrl, Headers, jiffy:encode(Body)), + {Code, jiffy:decode(Res, [return_maps])}. + +rctx(Opts) -> + % Update `docs_read` to make standard `{docs_read, fun matcher_on_docs_read/1, 1000}` + % matcher match. + Threshold = config:get("csrt_logger.matchers_threshold", "rows_read", 1000), + BaseOpts = #{docs_read => Threshold + 1, username => <<"user_foo">>}, + csrt_test_helper:rctx_gen(maps:merge(BaseOpts, Opts)).
