This is an automated email from the ASF dual-hosted git repository.
chewbranca pushed a commit to branch
couch-stats-resource-tracker-v3-rebaseb-otp26
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to
refs/heads/couch-stats-resource-tracker-v3-rebaseb-otp26 by this push:
new 91e942c92 Update HTTP API
91e942c92 is described below
commit 91e942c926c85386c9c771466144ca1f1f1928dd
Author: ILYA Khlopotov <[email protected]>
AuthorDate: Tue Jun 10 12:05:16 2025 -0700
Update HTTP API
Replace comparison with pattern match.
Add QUERY_CARDINALITY_LIMIT to group_by
Use sliding topK
Pass matcher to group_by and friends
Add RPC
Add HTTP
---
src/chttpd/src/chttpd_httpd_handlers.erl | 2 +-
.../src/couch_stats_resource_tracker.hrl | 1 +
src/couch_stats/src/csrt.erl | 50 +++++++
src/couch_stats/src/csrt_httpd.erl | 165 +++++++++++++++++++++
src/couch_stats/src/csrt_query.erl | 145 +++++++++++++-----
5 files changed, 328 insertions(+), 35 deletions(-)
diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl
b/src/chttpd/src/chttpd_httpd_handlers.erl
index e1b260222..9256f08b1 100644
--- a/src/chttpd/src/chttpd_httpd_handlers.erl
+++ b/src/chttpd/src/chttpd_httpd_handlers.erl
@@ -20,7 +20,7 @@ url_handler(<<"_utils">>) -> fun
chttpd_misc:handle_utils_dir_req/1;
url_handler(<<"_all_dbs">>) -> fun chttpd_misc:handle_all_dbs_req/1;
url_handler(<<"_dbs_info">>) -> fun chttpd_misc:handle_dbs_info_req/1;
url_handler(<<"_active_tasks">>) -> fun chttpd_misc:handle_task_status_req/1;
-url_handler(<<"_active_resources">>) -> fun
chttpd_misc:handle_resource_status_req/1;
+url_handler(<<"_active_resources">>) -> fun
csrt_httpd:handle_resource_status_req/1;
url_handler(<<"_scheduler">>) -> fun
couch_replicator_httpd:handle_scheduler_req/1;
url_handler(<<"_node">>) -> fun chttpd_node:handle_node_req/1;
url_handler(<<"_reload_query_servers">>) -> fun
chttpd_misc:handle_reload_query_servers_req/1;
diff --git a/src/couch_stats/src/couch_stats_resource_tracker.hrl
b/src/couch_stats/src/couch_stats_resource_tracker.hrl
index aec21115e..350438135 100644
--- a/src/couch_stats/src/couch_stats_resource_tracker.hrl
+++ b/src/couch_stats/src/couch_stats_resource_tracker.hrl
@@ -41,6 +41,7 @@
-define(CONF_MATCHERS_ENABLED, "csrt_logger.matchers_enabled").
-define(CONF_MATCHERS_THRESHOLD, "csrt_logger.matchers_threshold").
-define(CONF_MATCHERS_DBNAMES, "csrt_logger.dbnames_io").
+-define(QUERY_CARDINALITY_LIMIT, 10_000).
%% Mapping of couch_stat metric names to #rctx{} field names.
%% These are used for fields that we inc a counter on.
diff --git a/src/couch_stats/src/csrt.erl b/src/couch_stats/src/csrt.erl
index 603e444e9..123a3c0de 100644
--- a/src/couch_stats/src/csrt.erl
+++ b/src/couch_stats/src/csrt.erl
@@ -71,6 +71,12 @@
should_track_init_p/1
]).
+%% RPC api
+-export([
+ rpc/2,
+ call/1
+]).
+
%% aggregate query api
-export([
active/0,
@@ -99,6 +105,50 @@
proc_window/3
]).
+%%
+%% RPC operations
+%%
+
+-spec rpc(FName :: atom(), Args :: [any()]) ->
+ {[{node(), Result :: any()}], Errors :: [{badrpc, Reason :: any()}],
BadNodes :: [node()]}.
+rpc(FName, Args) when is_atom(FName) andalso is_list(Args) ->
+ {Resp, BadNodes} = rpc:multicall(?MODULE, call, [{FName, Args}]),
+ {Results, Errors} = split_response(Resp),
+ {Results, lists:usort(Errors), BadNodes}.
+
+split_response(Resp) ->
+ lists:foldl(fun(Message, {Results, Errors}) ->
+ case Message of
+ {badrpc, _} = E ->
+ {Results, [E | Errors]};
+ Result ->
+ {[Result | Results], Errors}
+ end
+ end, {[], []}, Resp).
+
+call({active, []}) -> {node(), active()};
+call({active, [json]}) -> {node(), active(json)};
+call({active_coordinators, []}) -> {node(), active_coordinators()};
+call({active_coordinators, [json]}) -> {node(), active_coordinators(json)};
+call({active_workers, []}) -> {node(), active_workers()};
+call({active_workers, [json]}) -> {node(), active_workers(json)};
+call({count_by, [Key]}) -> {node(), count_by(Key)};
+call({find_by_nonce, [Nonce]}) -> {node(), find_by_nonce(Nonce)};
+call({find_by_pid, [Pid]}) -> {node(), find_by_pid(Pid)};
+call({find_by_pidref, [PidRef]}) -> {node(), find_by_pidref(PidRef)};
+call({find_workers_by_pidref, [PidRef]}) -> {node(),
find_workers_by_pidref(PidRef)};
+call({group_by, [Key, Val]}) -> {node(), group_by(Key, Val)};
+call({group_by, [Key, Val, Agg]}) -> {node(), group_by(Key, Val, Agg)};
+call({sorted, [Map]}) -> {node(), sorted(Map)};
+call({sorted_by, [Key]}) -> {node(), sorted_by(Key)};
+call({sorted_by, [Key, Val]}) -> {node(), sorted_by(Key, Val)};
+call({sorted_by, [Key, Val, Agg]}) -> {node(), sorted_by(Key, Val, Agg)};
+call({FunName, Args}) ->
+ FunNameBin = atom_to_binary(FunName),
+ ArityBin = integer_to_binary(length(Args)),
+ {error, <<"No such function '"/binary, FunNameBin/binary, "/"/binary,
ArityBin/binary>>}.
+
+
%%
%% PidRef operations
%%
diff --git a/src/couch_stats/src/csrt_httpd.erl
b/src/couch_stats/src/csrt_httpd.erl
new file mode 100644
index 000000000..c02e6295c
--- /dev/null
+++ b/src/couch_stats/src/csrt_httpd.erl
@@ -0,0 +1,165 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(csrt_httpd).
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_stats_resource_tracker.hrl").
+
+-export([handle_resource_status_req/1]).
+
+-import(
+ chttpd,
+ [
+ send_json/2, send_json/3,
+ send_method_not_allowed/2
+ ]
+).
+
+rpc_to_json({Resp, _Errors, _Nodes}) ->
+ #{<<"results">> => resp_to_json(Resp, #{}), <<"errors">> => [],
<<"bad_nodes">> => []}.
+
+resp_to_json([{N, R} | Rest], Acc) ->
+ resp_to_json(Rest, maps:put(atom_to_binary(N), R, Acc));
+resp_to_json([], Acc) ->
+ Acc.
+
+% handle_resource_status_req(#httpd{method = 'GET', path_parts =
[<<"_active_resources">>]} = Req) ->
+% ok = chttpd:verify_is_server_admin(Req),
+% %% TODO: incorporate Bad responses
+% Resp = rpc_to_json(csrt:rpc(active, [json])),
+% send_json(Req, Resp);
+handle_resource_status_req(#httpd{method = 'POST', path_parts =
[<<"_active_resources">>, <<"_match">>, MatcherName]} = Req) ->
+ chttpd:validate_ctype(Req, "application/json"),
+ {JsonProps} = chttpd:json_body_obj(Req),
+ GroupBy = couch_util:get_value(<<"group_by">>, JsonProps),
+ SortBy = couch_util:get_value(<<"sort_by">>, JsonProps),
+ CountBy = couch_util:get_value(<<"count_by">>, JsonProps),
+
+ case {GroupBy, SortBy, CountBy} of
+ {undefined, undefined, {Query}} ->
+ handle_count_by(Req, MatcherName, Query);
+ {undefined, {Query}, undefined} ->
+ handle_sort_by(Req, MatcherName, Query);
+ {{Query}, undefined, undefined} ->
+ handle_group_by(Req, MatcherName, Query);
+ {_, _, _} ->
+ throw({bad_request, <<"Multiple aggregations are not supported">>})
+ end;
+handle_resource_status_req(#httpd{path_parts = [<<"_active_resources">>]} =
Req) ->
+ ok = chttpd:verify_is_server_admin(Req),
+ send_method_not_allowed(Req, "GET,HEAD");
+
+handle_resource_status_req(Req) ->
+ ok = chttpd:verify_is_server_admin(Req),
+ send_method_not_allowed(Req, "GET,HEAD,POST").
+
+handle_count_by(Req, MatcherName, CountBy) ->
+ AggregationKeys = couch_util:get_value(<<"aggregate_keys">>, CountBy),
+ AggregationKey = parse_key(AggregationKeys),
+ case csrt_query:count_by(matching(MatcherName), AggregationKey) of
+ {ok, Map} ->
+ send_json(Req, {aggregation_result_to_json(Map)});
+ Else ->
+ %% TODO handle error
+ throw({bad_request, Else})
+ end.
+
+handle_sort_by(Req, MatcherName, SortBy) ->
+ AggregationKeys = couch_util:get_value(<<"aggregate_keys">>, SortBy),
+ CounterKey = couch_util:get_value(<<"counter_key">>, SortBy),
+ AggregationKey = parse_key(AggregationKeys),
+ ValueKey = parse_key(CounterKey),
+ case csrt_query:sort_by(matching(MatcherName), AggregationKey, ValueKey) of
+ {ok, Map} ->
+ send_json(Req, {aggregation_result_to_json(Map)});
+ Else ->
+ %% TODO handle error
+ throw({bad_request, Else})
+ end.
+
+handle_group_by(Req, MatcherName, GroupBy) ->
+ AggregationKeys = couch_util:get_value(<<"aggregate_keys">>, GroupBy),
+ CounterKey = couch_util:get_value(<<"counter_key">>, GroupBy),
+ AggregationKey = parse_key(AggregationKeys),
+ ValueKey = parse_key(CounterKey),
+ case csrt_query:group_by(matching(MatcherName), AggregationKey, ValueKey)
of
+ {ok, Map} ->
+ send_json(Req, {aggregation_result_to_json(Map)});
+ Else ->
+ %% TODO handle error
+ throw({bad_request, Else})
+ end.
+
+aggregation_result_to_json(Map) when is_map(Map) ->
+ maps:fold(fun(K, V, Acc) -> [{key_to_string(K), V} | Acc] end, [], Map).
+
+key_to_string(Key) when is_tuple(Key) ->
+ list_to_binary(string:join([atom_to_list(K) || K <- tuple_to_list(Key)],
","));
+key_to_string(Key) when is_atom(Key) ->
+ atom_to_binary(Key).
+
+matching(MatcherName) ->
+ case csrt_logger:get_matcher(binary_to_list(MatcherName)) of
+ undefined ->
+ throw({bad_request, <<"unknown matcher '"/binary,
MatcherName/binary, "'"/binary>>});
+ Matcher ->
+ Matcher
+ end.
+
+% extract one of the predefined matchers
+% - docs_read
+% - rows_read
+% - docs_written
+% - worker_changes_processed
+% - ioq_calls
+query_matcher(MatcherName, AggregationKey, CounterKey) ->
+ case csrt_logger:get_matcher(binary_to_list(MatcherName)) of
+ undefined ->
+ {error, <<"unknown matcher '"/binary, MatcherName/binary,
"'"/binary>>};
+ Matcher ->
+ csrt_query:query_matcher(Matcher, AggregationKey, CounterKey)
+ end.
+
+-spec to_key(BinKey :: binary() | string()) -> Key :: rctx_field()
+ | throw({bad_request, Reason :: binary()}).
+
+to_key(<<"pid_ref">>) -> pid_ref;
+to_key(<<"nonce">>) -> nonce;
+to_key(<<"type">>) -> type;
+to_key(<<"dbname">>) -> dbname;
+to_key(<<"username">>) -> username;
+to_key(<<"db_open">>) -> db_open;
+to_key(<<"docs_read">>) -> docs_read;
+to_key(<<"rows_read">>) -> rows_read;
+to_key(<<"changes_returned">>) -> changes_returned;
+to_key(<<"ioq_calls">>) -> ioq_calls;
+to_key(<<"js_filter">>) -> js_filter;
+to_key(<<"js_filtered_docs">>) -> js_filtered_docs;
+to_key(<<"get_kv_node">>) -> get_kv_node;
+to_key(<<"get_kp_node">>) -> get_kp_node;
+to_key(Other) when is_binary(Other) -> throw({bad_request, <<"Invalid key
'"/binary, Other/binary, "'"/binary>>}).
+
+-spec parse_key(Keys :: binary() | [binary()]) -> [rctx_field()]
+ | throw({bad_request, Reason :: binary()}).
+
+parse_key(Keys) when is_list(Keys) ->
+ parse_key(Keys, []);
+parse_key(BinKey) when is_binary(BinKey) ->
+ to_key(BinKey);
+parse_key(undefined) ->
+ undefined.
+
+parse_key([BinKey | Rest], Keys) ->
+ parse_key(Rest, [to_key(BinKey) | Keys]);
+parse_key([], Keys) ->
+ lists:reverse(Keys).
+
diff --git a/src/couch_stats/src/csrt_query.erl
b/src/couch_stats/src/csrt_query.erl
index 5c1f7cd7a..4edf5365c 100644
--- a/src/couch_stats/src/csrt_query.erl
+++ b/src/couch_stats/src/csrt_query.erl
@@ -23,17 +23,17 @@
active_coordinators/1,
active_workers/0,
active_workers/1,
- count_by/1,
+ all/0,
+ count_by/2,
find_by_nonce/1,
find_by_pid/1,
find_by_pidref/1,
find_workers_by_pidref/1,
- group_by/2,
group_by/3,
- sorted/1,
- sorted_by/1,
+ group_by/4,
sorted_by/2,
- sorted_by/3
+ sorted_by/3,
+ sorted_by/4
]).
%%
@@ -112,45 +112,122 @@ field(#rctx{updated_at = Val}, updated_at) -> Val.
curry_field(Field) ->
fun(Ele) -> field(Ele, Field) end.
-count_by(KeyFun) ->
- group_by(KeyFun, fun(_) -> 1 end).
+count_by(Matcher, KeyFun) ->
+ group_by(Matcher, KeyFun, fun(_) -> 1 end).
-group_by(KeyFun, ValFun) ->
- group_by(KeyFun, ValFun, fun erlang:'+'/2).
+group_by(Matcher, KeyFun, ValFun) ->
+ group_by(Matcher, KeyFun, ValFun, fun erlang:'+'/2).
-group_by(KeyL, ValFun, AggFun) when is_list(KeyL) ->
+group_by(Matcher, KeyFun, ValFun, AggFun) ->
+ group_by(Matcher, KeyFun, ValFun, AggFun, ?QUERY_CARDINALITY_LIMIT).
+
+-spec all() -> matcher().
+
+all() ->
+ Spec = ets:fun2ms(fun(#rctx{} = R) -> R end),
+ {Spec, ets:match_spec_compile(Spec)}.
+
+%% eg: group_by(all(), mfa, docs_read).
+%% eg: group_by(all(), fun(#rctx{mfa=MFA,docs_read=DR}) -> {MFA, DR} end,
ioq_calls).
+%% eg: ^^ or: group_by(all(), [mfa, docs_read], ioq_calls).
+%% eg: group_by(all(), [username, dbname, mfa], docs_read).
+%% eg: group_by(all(), [username, dbname, mfa], ioq_calls).
+%% eg: group_by(all(), [username, dbname, mfa], js_filters).
+group_by(Matcher, KeyL, ValFun, AggFun, Limit) when is_list(KeyL) ->
KeyFun = fun(Ele) -> list_to_tuple([field(Ele, Key) || Key <- KeyL]) end,
- group_by(KeyFun, ValFun, AggFun);
-group_by(Key, ValFun, AggFun) when is_atom(Key) ->
- group_by(curry_field(Key), ValFun, AggFun);
-group_by(KeyFun, Val, AggFun) when is_atom(Val) ->
- group_by(KeyFun, curry_field(Val), AggFun);
-group_by(KeyFun, ValFun, AggFun) ->
+ group_by(Matcher, KeyFun, ValFun, AggFun, Limit);
+group_by(Matcher, Key, ValFun, AggFun, Limit) when is_atom(Key) ->
+ group_by(Matcher, curry_field(Key), ValFun, AggFun, Limit);
+group_by(Matcher, KeyFun, Val, AggFun, Limit) when is_atom(Val) ->
+ group_by(Matcher, KeyFun, curry_field(Val), AggFun, Limit);
+group_by(Matcher, KeyFun, ValFun, AggFun, Limit) ->
FoldFun = fun(Ele, Acc) ->
- Key = KeyFun(Ele),
- Val = ValFun(Ele),
- CurrVal = maps:get(Key, Acc, 0),
- NewVal = AggFun(CurrVal, Val),
- %% TODO: should we skip here? how to make this optional?
- case NewVal > 0 of
+ case maps:size(Acc) =< Limit of
true ->
- maps:put(Key, NewVal, Acc);
+ case maybe_match(Ele, Matcher) of
+ true ->
+ Key = KeyFun(Ele),
+ Val = ValFun(Ele),
+ CurrVal = maps:get(Key, Acc, 0),
+ case AggFun(CurrVal, Val) of
+ 0 ->
+ Acc;
+ NewVal ->
+ maps:put(Key, NewVal, Acc)
+ end;
+ false ->
+ Acc
+ end;
false ->
- Acc
- end
+ throw({limit, Acc})
+ end
end,
- ets:foldl(FoldFun, #{}, ?CSRT_ETS).
+ try
+ {ok, ets:foldl(FoldFun, #{}, ?CSRT_ETS)}
+ catch throw:{limit, Acc} ->
+ {limit, Acc}
+ end.
-%% Sorts largest first
-sorted(Map) when is_map(Map) ->
- lists:sort(fun({_K1, A}, {_K2, B}) -> B < A end, maps:to_list(Map)).
+maybe_match(_Ele, undefined) ->
+ true;
+maybe_match(Ele, {_, MS}) ->
+ ets:match_spec_run([Ele], MS) =/= [].
-shortened(L) ->
- lists:sublist(L, 10).
+%%
+%% Auxiliary functions to calculate topK
+%%
-sorted_by(KeyFun) -> shortened(sorted(count_by(KeyFun))).
-sorted_by(KeyFun, ValFun) -> shortened(sorted(group_by(KeyFun, ValFun))).
-sorted_by(KeyFun, ValFun, AggFun) -> shortened(sorted(group_by(KeyFun, ValFun,
AggFun))).
+-record(topK, {
+ % we store ordered elements in ascending order
+ seq = [] :: list(pos_integer()),
+ % we rely on erlang sorting order where `number < atom`
+ min = infinite :: infinite | pos_integer(),
+ max = 0 :: pos_integer(),
+ size = 0 :: non_neg_integer(),
+ % capacity cannot be less than 1
+ capacity = 1 :: pos_integer()
+}).
+
+new_topK(K) when K >= 1 ->
+ #topK{capacity = K}.
+
+% when we are at capacity
+% don't bother adding the value since it is less than what we already saw
+update_topK(_Key, Value, #topK{size = S, capacity = S, min = Min} = Top) when
Value < Min ->
+ Top#topK{min = Value};
+% when we are at capacity evict smallest value
+update_topK(Key, Value, #topK{size = S, capacity = S, max = Max, seq = Seq} =
Top) when Value > Max ->
+ % capacity cannot be less than 1, so we can avoid handling the case when
Seq is empty
+ [_ | Truncated] = Seq,
+ Top#topK{max = Value, seq = lists:keysort(2, [{Key, Value} | Truncated])};
+% when we are at capacity and value is in between min and max evict smallest
value
+update_topK(Key, Value, #topK{size = S, capacity = S, seq = Seq} = Top) ->
+ % capacity cannot be less than 1, so we can avoid handling the case when
Seq is empty
+ [_ | Truncated] = Seq,
+ Top#topK{seq = lists:keysort(2, [{Key, Value} | Truncated])};
+update_topK(Key, Value, #topK{size = S, min = Min, seq = Seq} = Top) when
Value < Min ->
+ Top#topK{size = S + 1, min = Value, seq = lists:keysort(2, [{Key, Value} |
Seq])};
+update_topK(Key, Value, #topK{size = S, max = Max, seq = Seq} = Top) when
Value > Max ->
+ Top#topK{size = S + 1, max = Value, seq = lists:keysort(2, [{Key, Value} |
Seq])};
+update_topK(Key, Value, #topK{size = S, seq = Seq} = Top) ->
+ Top#topK{size = S + 1, seq = lists:keysort(2, [{Key, Value} | Seq])}.
+
+get_topK(#topK{seq = S}) ->
+ lists:reverse(S).
+
+topK(Results, K) ->
+ TopK = maps:fold(fun update_topK/3, new_topK(K), Results),
+ get_topK(TopK).
+
+%% eg: sorted_by([username, dbname, mfa], ioq_calls)
+%% eg: sorted_by([dbname, mfa], doc_reads)
+sorted_by(KeyFun) -> topK(count_by(KeyFun), 10).
+sorted_by(KeyFun, ValFun) ->
+ {Result, Acc} = group_by(KeyFun, ValFun),
+ {Result, topK(Acc, 10)}.
+sorted_by(KeyFun, ValFun, AggFun) ->
+ {Result, Acc} = group_by(KeyFun, ValFun, AggFun),
+ {Result, topK(Acc, 10)}.
to_json_list(List) when is_list(List) ->
lists:map(fun csrt_util:to_json/1, List).