This is an automated email from the ASF dual-hosted git repository. chewbranca pushed a commit to branch couch-stats-resource-tracker-v3-rebase-http in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 74bc6e4ac725f449bf954b61e5294fa43b5fd2df Author: Russell Branca <[email protected]> AuthorDate: Mon Jun 23 20:46:02 2025 -0700 Hook in http updates and simple matcher querying --- src/chttpd/src/chttpd_node.erl | 11 +++ .../src/couch_stats_resource_tracker.hrl | 5 ++ src/couch_stats/src/csrt.erl | 26 ++++++- src/couch_stats/src/csrt_httpd.erl | 6 +- src/couch_stats/src/csrt_logger.erl | 5 -- src/couch_stats/src/csrt_query.erl | 85 ++++++++++++++++++++-- src/couch_stats/src/csrt_util.erl | 20 +++-- src/couch_stats/test/eunit/csrt_server_tests.erl | 10 ++- 8 files changed, 142 insertions(+), 26 deletions(-) diff --git a/src/chttpd/src/chttpd_node.erl b/src/chttpd/src/chttpd_node.erl index 2a67da9e3..f08530ce1 100644 --- a/src/chttpd/src/chttpd_node.erl +++ b/src/chttpd/src/chttpd_node.erl @@ -178,6 +178,17 @@ handle_node_req(#httpd{method = 'POST', path_parts = [_, Node, <<"_restart">>]} send_json(Req, 200, {[{ok, true}]}); handle_node_req(#httpd{path_parts = [_, _Node, <<"_restart">>]} = Req) -> send_method_not_allowed(Req, "POST"); +handle_node_req(#httpd{method = 'GET', path_parts = [_, Node, <<"_active_resources">>, <<"_match">>, MatcherName]} = Req) -> + case call_node(Node, csrt, query_matcher, [binary_to_list(MatcherName)]) of + {ok, Rctxs} -> + send_json(Req, 200, Rctxs); + {error, {unknown_matcher, _}} -> + throw({bad_request, <<"unknown matcher '", MatcherName/binary, "'">>}); + {error, Reason} -> + throw({bad_request, Reason}) + end; +handle_node_req(#httpd{method = _, path_parts = [_, _Node, <<"_active_resources">>, <<"_match">> | _]} = Req) -> + send_method_not_allowed(Req, "GET"); handle_node_req(#httpd{ path_parts = [_, Node | PathParts], mochi_req = MochiReq0 diff --git a/src/couch_stats/src/couch_stats_resource_tracker.hrl b/src/couch_stats/src/couch_stats_resource_tracker.hrl index 350438135..819e97375 100644 --- a/src/couch_stats/src/couch_stats_resource_tracker.hrl +++ b/src/couch_stats/src/couch_stats_resource_tracker.hrl @@ -42,6 +42,7 @@ -define(CONF_MATCHERS_THRESHOLD, "csrt_logger.matchers_threshold"). -define(CONF_MATCHERS_DBNAMES, "csrt_logger.dbnames_io"). -define(QUERY_CARDINALITY_LIMIT, 10_000). +-define(QUERY_LIMIT, 100). %% Mapping of couch_stat metric names to #rctx{} field names. %% These are used for fields that we inc a counter on. @@ -186,3 +187,7 @@ %% least one. Ideally, we'd specify the `Pos` type sufficiently to be one of the %% valid #rctx record field names, however, a clean solution is not obvious. -type counter_updates_list() :: [{non_neg_integer(), pos_integer()}] | []. + +-type query_options() :: #{aggregation => group_by | sort_by | count_by, limit => pos_integer()}. +-type aggregation_key() :: tuple(). +-type query_result() :: #{aggregation_key() => non_neg_integer()}. diff --git a/src/couch_stats/src/csrt.erl b/src/couch_stats/src/csrt.erl index 123a3c0de..e288a1ad4 100644 --- a/src/couch_stats/src/csrt.erl +++ b/src/couch_stats/src/csrt.erl @@ -92,6 +92,10 @@ find_workers_by_pidref/1, group_by/2, group_by/3, + query/1, + query/2, + query_matcher/1, + query_matcher/2, sorted/1, sorted_by/1, sorted_by/2, @@ -146,8 +150,7 @@ call({sorted_by, [Key, Val, Agg]}) -> {node(), sorted_by(Key, Val, Agg)}; call({FunName, Args}) -> FunNameBin = atom_to_binary(FunName), ArityBin = integer_to_binary(length(Args)), - {error, <<"No such function '"/binary, FunNameBin/binary, "/"/binary, ArityBin/binary>>}. - + {error, <<"No such function '", FunNameBin/binary, "/", ArityBin/binary>>}. %% %% PidRef operations @@ -482,6 +485,25 @@ pid_ref_attrs(AttrName) -> proc_window(AttrName, Num, Time) -> csrt_logger:proc_window(AttrName, Num, Time). +-spec query_matcher(MatcherName :: matcher_name()) -> {ok, query_result()} + | {error, any()}. +query_matcher(MatcherName) -> + csrt_query:query_matcher(MatcherName). + +-spec query_matcher(MatcherName :: matcher_name(), Limit :: pos_integer()) -> {ok, query_result()} + | {error, any()}. +query_matcher(MatcherName, Limit) -> + csrt_query:query_matcher(MatcherName, Limit). + +-spec query(Keys :: string() | [string()], Options :: query_options()) -> {ok, query_result()} + | {error, any()}. +query(Keys) -> + csrt_query:query(Keys). + +%% #{{<<"adm">>,<<"bench-yktbb3as46rzffea">>} => 2} +query(Keys, Options) -> + csrt_query:query(Keys, Options). + sorted(Map) -> csrt_query:sorted(Map). diff --git a/src/couch_stats/src/csrt_httpd.erl b/src/couch_stats/src/csrt_httpd.erl index c02e6295c..e4eb13fd2 100644 --- a/src/couch_stats/src/csrt_httpd.erl +++ b/src/couch_stats/src/csrt_httpd.erl @@ -110,7 +110,7 @@ key_to_string(Key) when is_atom(Key) -> matching(MatcherName) -> case csrt_logger:get_matcher(binary_to_list(MatcherName)) of undefined -> - throw({bad_request, <<"unknown matcher '"/binary, MatcherName/binary, "'"/binary>>}); + throw({bad_request, <<"unknown matcher '", MatcherName/binary, "'">>}); Matcher -> Matcher end. @@ -124,7 +124,7 @@ matching(MatcherName) -> query_matcher(MatcherName, AggregationKey, CounterKey) -> case csrt_logger:get_matcher(binary_to_list(MatcherName)) of undefined -> - {error, <<"unknown matcher '"/binary, MatcherName/binary, "'"/binary>>}; + {error, <<"unknown matcher '", MatcherName/binary, "'">>}; Matcher -> csrt_query:query_matcher(Matcher, AggregationKey, CounterKey) end. @@ -146,7 +146,7 @@ to_key(<<"js_filter">>) -> js_filter; to_key(<<"js_filtered_docs">>) -> js_filtered_docs; to_key(<<"get_kv_node">>) -> get_kv_node; to_key(<<"get_kp_node">>) -> get_kp_node; -to_key(Other) when is_binary(Other) -> throw({bad_request, <<"Invalid key '"/binary, Other/binary, "'"/binary>>}). +to_key(Other) when is_binary(Other) -> throw({bad_request, <<"Invalid key '", Other/binary, "'">>}). -spec parse_key(Keys :: binary() | [binary()]) -> [rctx_field()] | throw({bad_request, Reason :: binary()}). diff --git a/src/couch_stats/src/csrt_logger.erl b/src/couch_stats/src/csrt_logger.erl index 659524df7..b7d69356f 100644 --- a/src/couch_stats/src/csrt_logger.erl +++ b/src/couch_stats/src/csrt_logger.erl @@ -562,11 +562,6 @@ matcher_enabled(Name) when is_list(Name) -> -spec matcher_threshold(Name, Threshold) -> string() | integer() when Name :: string(), Threshold :: pos_integer() | string(). -matcher_threshold("dbname", DbName) when is_binary(DbName) -> - %% TODO: toggle Default to undefined to disallow for particular dbname - %% TODO: sort out list vs binary - %%config:get_integer(?CONF_MATCHERS_THRESHOLD, binary_to_list(DbName), Default); - DbName; matcher_threshold(Name, Default) when is_list(Name) andalso is_integer(Default) andalso Default > 0 -> diff --git a/src/couch_stats/src/csrt_query.erl b/src/couch_stats/src/csrt_query.erl index 4edf5365c..717b44de7 100644 --- a/src/couch_stats/src/csrt_query.erl +++ b/src/couch_stats/src/csrt_query.erl @@ -24,6 +24,7 @@ active_workers/0, active_workers/1, all/0, + count_by/1, count_by/2, find_by_nonce/1, find_by_pid/1, @@ -31,9 +32,15 @@ find_workers_by_pidref/1, group_by/3, group_by/4, - sorted_by/2, - sorted_by/3, - sorted_by/4 + query/1, + query/2, + query_matcher/1, + query_matcher/2, + query_matcher_rows/1, + query_matcher_rows/2, + sort_by/1, + sort_by/2, + sort_by/3 ]). %% @@ -112,9 +119,15 @@ field(#rctx{updated_at = Val}, updated_at) -> Val. curry_field(Field) -> fun(Ele) -> field(Ele, Field) end. +count_by(KeyFun) -> + csrt_query:count_by(all(), KeyFun). + count_by(Matcher, KeyFun) -> group_by(Matcher, KeyFun, fun(_) -> 1 end). +group_by(KeyFun, ValFun) -> + csrt_query:group_by(all(), KeyFun, ValFun). + group_by(Matcher, KeyFun, ValFun) -> group_by(Matcher, KeyFun, ValFun, fun erlang:'+'/2). @@ -219,15 +232,71 @@ topK(Results, K) -> TopK = maps:fold(fun update_topK/3, new_topK(K), Results), get_topK(TopK). -%% eg: sorted_by([username, dbname, mfa], ioq_calls) -%% eg: sorted_by([dbname, mfa], doc_reads) -sorted_by(KeyFun) -> topK(count_by(KeyFun), 10). -sorted_by(KeyFun, ValFun) -> +%% eg: sort_by([username, dbname, type], ioq_calls) +%% eg: sort_by([dbname, type], doc_reads) +sort_by(KeyFun) -> + topK(count_by(KeyFun), 10). +sort_by(KeyFun, ValFun) -> {Result, Acc} = group_by(KeyFun, ValFun), {Result, topK(Acc, 10)}. -sorted_by(KeyFun, ValFun, AggFun) -> +sort_by(KeyFun, ValFun, AggFun) -> {Result, Acc} = group_by(KeyFun, ValFun, AggFun), {Result, topK(Acc, 10)}. to_json_list(List) when is_list(List) -> lists:map(fun csrt_util:to_json/1, List). + +-spec query_matcher(MatcherName :: string()) -> {ok, query_result()} + | {error, any()}. +query_matcher(MatcherName) when is_list(MatcherName) -> + query_matcher(MatcherName, query_limit()). + +-spec query_matcher(MatcherName :: matcher_name(), Limit :: pos_integer()) -> {ok, query_result()} + | {error, any()}. +query_matcher(MatcherName, Limit) when is_list(MatcherName) andalso is_integer(Limit) -> + case csrt_logger:get_matcher(MatcherName) of + undefined -> + {error, {unknown_matcher, MatcherName}}; + Matcher -> + query_matcher_rows(Matcher, Limit) + end. + +-spec query_matcher_rows(Matcher :: matcher()) -> {ok, query_result()} + | {error, any()}. +query_matcher_rows(Matcher) -> + query_matcher_rows(Matcher, query_limit()). + +-spec query_matcher_rows(Matcher :: matcher(), Limit :: pos_integer()) -> {ok, query_result()} + | {error, any()}. +query_matcher_rows({MSpec, _CompMSpec}, Limit) when is_list(MSpec) andalso is_integer(Limit) andalso Limit >= 1 -> + try + %% ets:select/* takes match_spec(), not comp_match_spec() + %% use ets:select/3 to constrain to Limit rows, but we need to handle + %% the continuation() style return type compared with ets:select/2. + Rctxs = case ets:select(?CSRT_ETS, MSpec, Limit) of + {Rctxs0, _Continuation} -> + Rctxs0; + %% Handle '$end_of_table' + _ -> + [] + end, + {ok, to_json_list(Rctxs)} + catch + _:_ = Error -> + {error, Error} + end. + +-spec query(Keys :: string() | [string()]) -> {ok, query_result()} + | {error, any()}. +%% #{{<<"adm">>,<<"bench-yktbb3as46rzffea">>} => 2} +query(Keys) -> + query(Keys, []). + +-spec query(Keys :: string() | [string()], Options :: query_options()) -> {ok, query_result()} + | {error, any()}. +%% #{{<<"adm">>,<<"bench-yktbb3as46rzffea">>} => 2} +query(_Keys, _Options) -> + {error, todo}. + +query_limit() -> + config:get_integer(?CSRT, "query_limit", ?QUERY_LIMIT). diff --git a/src/couch_stats/src/csrt_util.erl b/src/couch_stats/src/csrt_util.erl index 63782d4e7..c19c10d6f 100644 --- a/src/couch_stats/src/csrt_util.erl +++ b/src/couch_stats/src/csrt_util.erl @@ -32,6 +32,7 @@ convert_pidref/1, convert_pid/1, convert_ref/1, + convert_string/1, to_json/1 ]). @@ -168,6 +169,7 @@ convert_type(#coordinator{method = Verb0, path = Path, mod = M0, func = F0}) -> convert_type(#rpc_worker{mod = M0, func = F0, from = From0}) -> M = atom_to_binary(M0), F = atom_to_binary(F0), + %% Technically From is a PidRef data type from Pid, but different Ref for fabric From = convert_pidref(From0), <<"rpc_worker-{", From/binary, "}:", M/binary, ":", F/binary>>; convert_type(undefined) -> @@ -192,15 +194,23 @@ convert_pid(Pid) when is_pid(Pid) -> convert_ref(Ref) when is_reference(Ref) -> list_to_binary(ref_to_list(Ref)). +-spec convert_string(Str :: string() | binary() | undefined) -> binary() | null. +convert_string(undefined) -> + null; +convert_string(Str) when is_list(Str) -> + list_to_binary(Str); +convert_string(Bin) when is_binary(Bin) -> + Bin. + -spec to_json(Rctx :: rctx()) -> map(). to_json(#rctx{} = Rctx) -> #{ - updated_at => tutc(Rctx#rctx.updated_at), - started_at => tutc(Rctx#rctx.started_at), + updated_at => convert_string(tutc(Rctx#rctx.updated_at)), + started_at => convert_string(tutc(Rctx#rctx.started_at)), pid_ref => convert_pidref(Rctx#rctx.pid_ref), - nonce => Rctx#rctx.nonce, - dbname => Rctx#rctx.dbname, - username => Rctx#rctx.username, + nonce => convert_string(Rctx#rctx.nonce), + dbname => convert_string(Rctx#rctx.dbname), + username => convert_string(Rctx#rctx.username), db_open => Rctx#rctx.db_open, docs_read => Rctx#rctx.docs_read, docs_written => Rctx#rctx.docs_written, diff --git a/src/couch_stats/test/eunit/csrt_server_tests.erl b/src/couch_stats/test/eunit/csrt_server_tests.erl index c2241a5e6..04203b6aa 100644 --- a/src/couch_stats/test/eunit/csrt_server_tests.erl +++ b/src/couch_stats/test/eunit/csrt_server_tests.erl @@ -508,16 +508,20 @@ rctx_assert(Rctx, Asserts0) -> changes_returned => 0, js_filter => 0, js_filtered_docs => 0, - nonce => undefined, + nonce => null, db_open => 0, rows_read => 0, docs_read => 0, docs_written => 0, - pid_ref => undefined + pid_ref => null + }, + Updates = #{ + pid_ref => fun convert_pidref/1, + nonce => fun csrt_util:convert_string/1 }, Asserts = maps:merge( DefaultAsserts, - maps:update_with(pid_ref, fun convert_pidref/1, Asserts0) + maps:fold(fun maps:update_with/3, Asserts0, Updates) ), ok = maps:foreach( fun
