This is an automated email from the ASF dual-hosted git repository. chewbranca pushed a commit to branch couch-stats-resource-tracker-v2 in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 9ef7c4be04e15cbd108cbb39e315e6f69702d3e0 Author: Russell Branca <[email protected]> AuthorDate: Fri Jun 7 15:46:17 2024 -0700 Port in csrt changes --- src/chttpd/src/chttpd.erl | 5 ++ src/chttpd/src/chttpd_db.erl | 2 + src/chttpd/src/chttpd_httpd_handlers.erl | 1 + src/chttpd/src/chttpd_misc.erl | 105 +++++++++++++++++++++++ src/couch/include/couch_db.hrl | 2 + src/couch/priv/stats_descriptions.cfg | 33 +++++++ src/couch/src/couch_btree.erl | 3 + src/couch/src/couch_db.erl | 2 + src/couch/src/couch_query_servers.erl | 8 ++ src/couch/src/couch_server.erl | 2 + src/couch_stats/src/couch_stats.app.src | 8 +- src/couch_stats/src/couch_stats.erl | 30 +++++++ src/couch_stats/src/couch_stats_sup.erl | 1 + src/fabric/priv/stats_descriptions.cfg | 50 +++++++++++ src/fabric/src/fabric_rpc.erl | 15 ++++ src/fabric/src/fabric_util.erl | 4 + src/fabric/test/eunit/fabric_rpc_purge_tests.erl | 2 + src/fabric/test/eunit/fabric_rpc_tests.erl | 11 ++- src/mango/src/mango_cursor_view.erl | 2 + src/mango/src/mango_selector.erl | 1 + src/mem3/src/mem3_rpc.erl | 34 +++++--- src/rexi/include/rexi.hrl | 1 + src/rexi/src/rexi.erl | 18 +++- src/rexi/src/rexi_monitor.erl | 1 + src/rexi/src/rexi_server.erl | 26 ++++-- src/rexi/src/rexi_utils.erl | 34 +++++++- 26 files changed, 377 insertions(+), 24 deletions(-) diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl index 0c1380862..9642b2089 100644 --- a/src/chttpd/src/chttpd.erl +++ b/src/chttpd/src/chttpd.erl @@ -323,6 +323,9 @@ handle_request_int(MochiReq) -> % Save client socket so that it can be monitored for disconnects chttpd_util:mochiweb_client_req_set(MochiReq), + %% This is probably better in before_request, but having Path is nice + couch_stats_resource_tracker:create_coordinator_context(HttpReq0, Path), + {HttpReq2, Response} = case before_request(HttpReq0) of {ok, HttpReq1} -> @@ -372,6 +375,7 @@ after_request(HttpReq, HttpResp0) -> HttpResp2 = update_stats(HttpReq, HttpResp1), chttpd_stats:report(HttpReq, HttpResp2), maybe_log(HttpReq, HttpResp2), + couch_stats_resource_tracker:destroy_context(), HttpResp2. process_request(#httpd{mochi_req = MochiReq} = HttpReq) -> @@ -409,6 +413,7 @@ handle_req_after_auth(HandlerKey, HttpReq) -> HandlerKey, fun chttpd_db:handle_request/1 ), + couch_stats_resource_tracker:set_context_handler_fun(HandlerFun), AuthorizedReq = chttpd_auth:authorize( possibly_hack(HttpReq), fun chttpd_auth_request:authorize_request/1 diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index 062e0bf24..8a816abde 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -83,6 +83,7 @@ % Database request handlers handle_request(#httpd{path_parts = [DbName | RestParts], method = Method} = Req) -> + couch_stats_resource_tracker:set_context_dbname(DbName), case {Method, RestParts} of {'PUT', []} -> create_db_req(Req, DbName); @@ -103,6 +104,7 @@ handle_request(#httpd{path_parts = [DbName | RestParts], method = Method} = Req) do_db_req(Req, fun db_req/2); {_, [SecondPart | _]} -> Handler = chttpd_handlers:db_handler(SecondPart, fun db_req/2), + couch_stats_resource_tracker:set_context_handler_fun(Handler), do_db_req(Req, Handler) end. diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl b/src/chttpd/src/chttpd_httpd_handlers.erl index 932b52e5f..e1b260222 100644 --- a/src/chttpd/src/chttpd_httpd_handlers.erl +++ b/src/chttpd/src/chttpd_httpd_handlers.erl @@ -20,6 +20,7 @@ url_handler(<<"_utils">>) -> fun chttpd_misc:handle_utils_dir_req/1; url_handler(<<"_all_dbs">>) -> fun chttpd_misc:handle_all_dbs_req/1; url_handler(<<"_dbs_info">>) -> fun chttpd_misc:handle_dbs_info_req/1; url_handler(<<"_active_tasks">>) -> fun chttpd_misc:handle_task_status_req/1; +url_handler(<<"_active_resources">>) -> fun chttpd_misc:handle_resource_status_req/1; url_handler(<<"_scheduler">>) -> fun couch_replicator_httpd:handle_scheduler_req/1; url_handler(<<"_node">>) -> fun chttpd_node:handle_node_req/1; url_handler(<<"_reload_query_servers">>) -> fun chttpd_misc:handle_reload_query_servers_req/1; diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl index 4b7c73b35..d35df68e5 100644 --- a/src/chttpd/src/chttpd_misc.erl +++ b/src/chttpd/src/chttpd_misc.erl @@ -20,6 +20,7 @@ handle_replicate_req/1, handle_reload_query_servers_req/1, handle_task_status_req/1, + handle_resource_status_req/1, handle_up_req/1, handle_utils_dir_req/1, handle_utils_dir_req/2, @@ -224,6 +225,110 @@ handle_task_status_req(#httpd{method = 'GET'} = Req) -> handle_task_status_req(Req) -> send_method_not_allowed(Req, "GET,HEAD"). +handle_resource_status_req(#httpd{method = 'POST'} = Req) -> + ok = chttpd:verify_is_server_admin(Req), + chttpd:validate_ctype(Req, "application/json"), + {Props} = chttpd:json_body_obj(Req), + Action = proplists:get_value(<<"action">>, Props), + Key = proplists:get_value(<<"key">>, Props), + Val = proplists:get_value(<<"val">>, Props), + + CountBy = fun couch_stats_resource_tracker:count_by/1, + GroupBy = fun couch_stats_resource_tracker:group_by/2, + SortedBy1 = fun couch_stats_resource_tracker:sorted_by/1, + SortedBy2 = fun couch_stats_resource_tracker:sorted_by/2, + ConvertEle = fun(K) -> list_to_existing_atom(binary_to_list(K)) end, + ConvertList = fun(L) -> [ConvertEle(E) || E <- L] end, + ToJson = fun couch_stats_resource_tracker:term_to_flat_json/1, + JsonKeys = fun(PL) -> [[ToJson(K), V] || {K, V} <- PL] end, + + Fun = case {Action, Key, Val} of + {<<"count_by">>, Keys, undefined} when is_list(Keys) -> + Keys1 = [ConvertEle(K) || K <- Keys], + fun() -> CountBy(Keys1) end; + {<<"count_by">>, Key, undefined} -> + Key1 = ConvertEle(Key), + fun() -> CountBy(Key1) end; + {<<"group_by">>, Keys, Vals} when is_list(Keys) andalso is_list(Vals) -> + Keys1 = ConvertList(Keys), + Vals1 = ConvertList(Vals), + fun() -> GroupBy(Keys1, Vals1) end; + {<<"group_by">>, Key, Vals} when is_list(Vals) -> + Key1 = ConvertEle(Key), + Vals1 = ConvertList(Vals), + fun() -> GroupBy(Key1, Vals1) end; + {<<"group_by">>, Keys, Val} when is_list(Keys) -> + Keys1 = ConvertList(Keys), + Val1 = ConvertEle(Val), + fun() -> GroupBy(Keys1, Val1) end; + {<<"group_by">>, Key, Val} -> + Key1 = ConvertEle(Key), + Val1 = ConvertList(Val), + fun() -> GroupBy(Key1, Val1) end; + + {<<"sorted_by">>, Key, undefined} -> + Key1 = ConvertEle(Key), + fun() -> JsonKeys(SortedBy1(Key1)) end; + {<<"sorted_by">>, Keys, undefined} when is_list(Keys) -> + Keys1 = [ConvertEle(K) || K <- Keys], + fun() -> JsonKeys(SortedBy1(Keys1)) end; + {<<"sorted_by">>, Keys, Vals} when is_list(Keys) andalso is_list(Vals) -> + Keys1 = ConvertList(Keys), + Vals1 = ConvertList(Vals), + fun() -> JsonKeys(SortedBy2(Keys1, Vals1)) end; + {<<"sorted_by">>, Key, Vals} when is_list(Vals) -> + Key1 = ConvertEle(Key), + Vals1 = ConvertList(Vals), + fun() -> JsonKeys(SortedBy2(Key1, Vals1)) end; + {<<"sorted_by">>, Keys, Val} when is_list(Keys) -> + Keys1 = ConvertList(Keys), + Val1 = ConvertEle(Val), + fun() -> JsonKeys(SortedBy2(Keys1, Val1)) end; + {<<"sorted_by">>, Key, Val} -> + Key1 = ConvertEle(Key), + Val1 = ConvertList(Val), + fun() -> JsonKeys(SortedBy2(Key1, Val1)) end; + _ -> + throw({badrequest, invalid_resource_request}) + end, + + Fun1 = fun() -> + case Fun() of + Map when is_map(Map) -> + {maps:fold( + fun + (_K,0,A) -> A; %% TODO: Skip 0 value entries? + (K,V,A) -> [{ToJson(K), V} | A] + end, + [], Map)}; + List when is_list(List) -> + List + end + end, + + {Resp, _Bad} = rpc:multicall(erlang, apply, [ + fun() -> + {node(), Fun1()} + end, + [] + ]), + %%io:format("{CSRT}***** GOT RESP: ~p~n", [Resp]), + send_json(Req, {Resp}); +handle_resource_status_req(#httpd{method = 'GET'} = Req) -> + ok = chttpd:verify_is_server_admin(Req), + {Resp, Bad} = rpc:multicall(erlang, apply, [ + fun() -> + {node(), couch_stats_resource_tracker:active()} + end, + [] + ]), + %% TODO: incorporate Bad responses + send_json(Req, {Resp}); +handle_resource_status_req(Req) -> + ok = chttpd:verify_is_server_admin(Req), + send_method_not_allowed(Req, "GET,HEAD,POST"). + + handle_replicate_req(#httpd{method = 'POST', user_ctx = Ctx, req_body = PostBody} = Req) -> chttpd:validate_ctype(Req, "application/json"), %% see HACK in chttpd.erl about replication diff --git a/src/couch/include/couch_db.hrl b/src/couch/include/couch_db.hrl index 9c1df21b6..ba6bd38ca 100644 --- a/src/couch/include/couch_db.hrl +++ b/src/couch/include/couch_db.hrl @@ -53,6 +53,8 @@ -define(INTERACTIVE_EDIT, interactive_edit). -define(REPLICATED_CHANGES, replicated_changes). +-define(LOG_UNEXPECTED_MSG(Msg), couch_log:warning("[~p:~p:~p/~p]{~p[~p]} Unexpected message: ~w", [?MODULE, ?LINE, ?FUNCTION_NAME, ?FUNCTION_ARITY, self(), element(2, process_info(self(), message_queue_len)), Msg])). + -type branch() :: {Key::term(), Value::term(), Tree::term()}. -type path() :: {Start::pos_integer(), branch()}. -type update_type() :: replicated_changes | interactive_edit. diff --git a/src/couch/priv/stats_descriptions.cfg b/src/couch/priv/stats_descriptions.cfg index 6a7120f87..33c556521 100644 --- a/src/couch/priv/stats_descriptions.cfg +++ b/src/couch/priv/stats_descriptions.cfg @@ -306,6 +306,10 @@ {type, counter}, {desc, <<"number of couch_server LRU operations skipped">>} ]}. +{[couchdb, couch_server, open], [ + {type, counter}, + {desc, <<"number of couch_server open operations invoked">>} +]}. {[couchdb, query_server, vdu_rejects], [ {type, counter}, {desc, <<"number of rejections by validate_doc_update function">>} @@ -418,10 +422,39 @@ {type, counter}, {desc, <<"number of other requests">>} ]}. +{[couchdb, query_server, js_filter], [ + {type, counter}, + {desc, <<"number of JS filter invocations">>} +]}. +{[couchdb, query_server, js_filtered_docs], [ + {type, counter}, + {desc, <<"number of docs filtered through JS invocations">>} +]}. +{[couchdb, query_server, js_filter_error], [ + {type, counter}, + {desc, <<"number of JS filter invocation errors">>} +]}. {[couchdb, legacy_checksums], [ {type, counter}, {desc, <<"number of legacy checksums found in couch_file instances">>} ]}. +{[couchdb, btree, folds], [ + {type, counter}, + {desc, <<"number of couch btree kv fold callback invocations">>} +]}. +{[couchdb, btree, kp_node], [ + {type, counter}, + {desc, <<"number of couch btree kp_nodes read">>} +]}. +{[couchdb, btree, kv_node], [ + {type, counter}, + {desc, <<"number of couch btree kv_nodes read">>} +]}. +%% CSRT (couch_stats_resource_tracker) stats +{[couchdb, csrt, delta_missing_t0], [ + {type, counter}, + {desc, <<"number of csrt contexts without a proper startime">>} +]}. {[pread, exceed_eof], [ {type, counter}, {desc, <<"number of the attempts to read beyond end of db file">>} diff --git a/src/couch/src/couch_btree.erl b/src/couch/src/couch_btree.erl index b974a22ee..27b5bc18b 100644 --- a/src/couch/src/couch_btree.erl +++ b/src/couch/src/couch_btree.erl @@ -472,6 +472,8 @@ reduce_tree_size(kp_node, NodeSize, [{_K, {_P, _Red, Sz}} | NodeList]) -> get_node(#btree{fd = Fd}, NodePos) -> {ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos), + %% TODO: wire in csrt tracking + couch_stats:increment_counter([couchdb, btree, NodeType]), {NodeType, NodeList}. write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) -> @@ -1163,6 +1165,7 @@ stream_kv_node2(Bt, Reds, PrevKVs, [{K, V} | RestKVs], InRange, Dir, Fun, Acc) - false -> {stop, {PrevKVs, Reds}, Acc}; true -> + couch_stats:increment_counter([couchdb, btree, folds]), AssembledKV = assemble(Bt, K, V), case Fun(visit, AssembledKV, {PrevKVs, Reds}, Acc) of {ok, Acc2} -> diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index 2ef89ced3..c7afaa4b3 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -297,6 +297,7 @@ open_doc(Db, IdOrDocInfo) -> open_doc(Db, IdOrDocInfo, []). open_doc(Db, Id, Options) -> + %% TODO: wire in csrt tracking increment_stat(Db, [couchdb, database_reads]), case open_doc_int(Db, Id, Options) of {ok, #doc{deleted = true} = Doc} -> @@ -1982,6 +1983,7 @@ increment_stat(#db{options = Options}, Stat, Count) when -> case lists:member(sys_db, Options) of true -> + %% TODO: we shouldn't leak resource usage just because it's a sys_db ok; false -> couch_stats:increment_counter(Stat, Count) diff --git a/src/couch/src/couch_query_servers.erl b/src/couch/src/couch_query_servers.erl index 6789bfaef..b5ddee312 100644 --- a/src/couch/src/couch_query_servers.erl +++ b/src/couch/src/couch_query_servers.erl @@ -542,6 +542,8 @@ filter_docs(Req, Db, DDoc, FName, Docs) -> {ok, filter_docs_int(Db, DDoc, FName, JsonReq, JsonDocs)} catch throw:{os_process_error, {exit_status, 1}} -> + %% TODO: wire in csrt tracking + couch_stats:increment_counter([couchdb, query_server, js_filter_error]), %% batch used too much memory, retry sequentially. Fun = fun(JsonDoc) -> filter_docs_int(Db, DDoc, FName, JsonReq, [JsonDoc]) @@ -550,6 +552,12 @@ filter_docs(Req, Db, DDoc, FName, Docs) -> end. filter_docs_int(Db, DDoc, FName, JsonReq, JsonDocs) -> + %% Count usage in _int version as this can be repeated for OS error + %% Pros & cons... might not have actually processed `length(JsonDocs)` docs + %% but it certainly undercounts if we count in `filter_docs/5` above + %% TODO: wire in csrt tracking + couch_stats:increment_counter([couchdb, query_server, js_filter]), + couch_stats:increment_counter([couchdb, query_server, js_filtered_docs], length(JsonDocs)), [true, Passes] = ddoc_prompt( Db, DDoc, diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl index 379724953..576938177 100644 --- a/src/couch/src/couch_server.erl +++ b/src/couch/src/couch_server.erl @@ -108,6 +108,8 @@ sup_start_link(N) -> gen_server:start_link({local, couch_server(N)}, couch_server, [N], []). open(DbName, Options) -> + %% TODO: wire in csrt tracking + couch_stats:increment_counter([couchdb, couch_server, open]), try validate_open_or_create(DbName, Options), open_int(DbName, Options) diff --git a/src/couch_stats/src/couch_stats.app.src b/src/couch_stats/src/couch_stats.app.src index a54fac734..de9f00e4e 100644 --- a/src/couch_stats/src/couch_stats.app.src +++ b/src/couch_stats/src/couch_stats.app.src @@ -13,8 +13,12 @@ {application, couch_stats, [ {description, "Simple statistics collection"}, {vsn, git}, - {registered, [couch_stats_aggregator, couch_stats_process_tracker]}, - {applications, [kernel, stdlib]}, + {registered, [ + couch_stats_aggregator, + couch_stats_process_tracker, + couch_stats_resource_tracker + ]}, + {applications, [kernel, stdlib, couch_log]}, {mod, {couch_stats_app, []}}, {env, []} ]}. diff --git a/src/couch_stats/src/couch_stats.erl b/src/couch_stats/src/couch_stats.erl index 29a402449..29190e6b0 100644 --- a/src/couch_stats/src/couch_stats.erl +++ b/src/couch_stats/src/couch_stats.erl @@ -24,6 +24,12 @@ update_gauge/2 ]). +%% couch_stats_resource_tracker API +-export([ + create_context/3, + maybe_track_rexi_init_p/1 +]). + -type response() :: ok | {error, unknown_metric} | {error, invalid_metric}. -type stat() :: {any(), [{atom(), any()}]}. @@ -49,6 +55,11 @@ increment_counter(Name) -> -spec increment_counter(any(), pos_integer()) -> response(). increment_counter(Name, Value) -> + %% Should maybe_track_local happen before or after notify? + %% If after, only currently tracked metrics declared in the app's + %% stats_description.cfg will be trackable locally. Pros/cons. + %io:format("NOTIFY_EXISTING_METRIC: ~p || ~p || ~p~n", [Name, Op, Type]), + ok = maybe_track_local_counter(Name, Value), case couch_stats_util:get_counter(Name, stats()) of {ok, Ctx} -> couch_stats_counter:increment(Ctx, Value); {error, Error} -> {error, Error} @@ -100,6 +111,25 @@ stats() -> now_sec() -> erlang:monotonic_time(second). +%% Only potentially track positive increments to counters +-spec maybe_track_local_counter(any(), any()) -> ok. +maybe_track_local_counter(Name, Val) when is_integer(Val) andalso Val > 0 -> + %%io:format("maybe_track_local[~p]: ~p~n", [Val, Name]), + couch_stats_resource_tracker:maybe_inc(Name, Val), + ok; +maybe_track_local_counter(_, _) -> + ok. + +create_context(From, MFA, Nonce) -> + couch_stats_resource_tracker:create_context(From, MFA, Nonce). + +maybe_track_rexi_init_p({M, F, _A}) -> + Metric = [M, F, spawned], + case couch_stats_resource_tracker:should_track(Metric) of + true -> increment_counter(Metric); + false -> ok + end. + -ifdef(TEST). -include_lib("couch/include/couch_eunit.hrl"). diff --git a/src/couch_stats/src/couch_stats_sup.erl b/src/couch_stats/src/couch_stats_sup.erl index 325372c3e..4b4df17e2 100644 --- a/src/couch_stats/src/couch_stats_sup.erl +++ b/src/couch_stats/src/couch_stats_sup.erl @@ -29,6 +29,7 @@ init([]) -> { {one_for_one, 5, 10}, [ ?CHILD(couch_stats_server, worker), + ?CHILD(couch_stats_resource_tracker, worker), ?CHILD(couch_stats_process_tracker, worker) ] }}. diff --git a/src/fabric/priv/stats_descriptions.cfg b/src/fabric/priv/stats_descriptions.cfg index d12aa0c84..9ab054bf0 100644 --- a/src/fabric/priv/stats_descriptions.cfg +++ b/src/fabric/priv/stats_descriptions.cfg @@ -26,3 +26,53 @@ {type, counter}, {desc, <<"number of write quorum errors">>} ]}. + + +%% fabric_rpc worker stats +%% TODO: decide on which naming scheme: +%% {[fabric_rpc, get_all_security, spawned], [ +%% {[fabric_rpc, spawned, get_all_security], [ +{[fabric_rpc, get_all_security, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker get_all_security spawns">>} +]}. +{[fabric_rpc, open_doc, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker open_doc spawns">>} +]}. +{[fabric_rpc, all_docs, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker all_docs spawns">>} +]}. +{[fabric_rpc, update_docs, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker update_docs spawns">>} +]}. +{[fabric_rpc, map_view, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker map_view spawns">>} +]}. +{[fabric_rpc, reduce_view, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker reduce_view spawns">>} +]}. +{[fabric_rpc, open_shard, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker open_shard spawns">>} +]}. +{[fabric_rpc, changes, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker changes spawns">>} +]}. +{[fabric_rpc, changes, processed], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker changes row invocations">>} +]}. +{[fabric_rpc, changes, returned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker changes rows returned">>} +]}. +{[fabric_rpc, view, rows_read], [ + {type, counter}, + {desc, <<"number of fabric_rpc view_cb row invocations">>} +]}. diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 9b00d9501..53b34f41a 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -493,6 +493,11 @@ view_cb({meta, Meta}, Acc) -> ok = rexi:stream2({meta, Meta}), {ok, Acc}; view_cb({row, Props}, #mrargs{extra = Options} = Acc) -> + %% TODO: distinguish between rows and docs + %% TODO: wire in csrt tracking + %% TODO: distinguish between all_docs vs view call + couch_stats:increment_counter([fabric_rpc, view, rows_read]), + %%couch_stats_resource_tracker:inc(rows_read), % Adding another row ViewRow = fabric_view_row:from_props(Props, Options), ok = rexi:stream2(ViewRow), @@ -529,6 +534,7 @@ changes_enumerator(#full_doc_info{} = FDI, Acc) -> changes_enumerator(#doc_info{id = <<"_local/", _/binary>>, high_seq = Seq}, Acc) -> {ok, Acc#fabric_changes_acc{seq = Seq, pending = Acc#fabric_changes_acc.pending - 1}}; changes_enumerator(DocInfo, Acc) -> + couch_stats:increment_counter([fabric_rpc, changes, processed]), #fabric_changes_acc{ db = Db, args = #changes_args{ @@ -569,6 +575,7 @@ changes_enumerator(DocInfo, Acc) -> {ok, Acc#fabric_changes_acc{seq = Seq, pending = Pending - 1}}. changes_row(Changes, Docs, DocInfo, Acc) -> + couch_stats:increment_counter([fabric_rpc, changes, returned]), #fabric_changes_acc{db = Db, pending = Pending, epochs = Epochs} = Acc, #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DocInfo, {change, [ @@ -667,6 +674,14 @@ clean_stack(S) -> ). set_io_priority(DbName, Options) -> + couch_stats_resource_tracker:set_context_dbname(DbName), + %% TODO: better approach here than using proplists? + case proplists:get_value(user_ctx, Options) of + undefined -> + ok; + #user_ctx{name = UserName} -> + couch_stats_resource_tracker:set_context_username(UserName) + end, case lists:keyfind(io_priority, 1, Options) of {io_priority, Pri} -> erlang:put(io_priority, Pri); diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index 4acb65c73..63c70e270 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -139,6 +139,10 @@ get_shard([#shard{node = Node, name = Name} | Rest], Opts, Timeout, Factor) -> receive {Ref, {ok, Db}} -> {ok, Db}; + %% TODO: switch to using rexi_utils:extract_delta + {Ref, {ok, Db}, {delta, Delta}} -> + couch_stats_resource_tracker:accumulate_delta(Delta), + {ok, Db}; {Ref, {'rexi_EXIT', {{unauthorized, _} = Error, _}}} -> throw(Error); {Ref, {'rexi_EXIT', {{forbidden, _} = Error, _}}} -> diff --git a/src/fabric/test/eunit/fabric_rpc_purge_tests.erl b/src/fabric/test/eunit/fabric_rpc_purge_tests.erl index 07e6b1d42..c7a36fbe3 100644 --- a/src/fabric/test/eunit/fabric_rpc_purge_tests.erl +++ b/src/fabric/test/eunit/fabric_rpc_purge_tests.erl @@ -263,6 +263,8 @@ rpc_update_doc(DbName, Doc, Opts) -> Reply = test_util:wait(fun() -> receive {Ref, Reply} -> + Reply; + {Ref, Reply, {delta, _}} -> Reply after 0 -> wait diff --git a/src/fabric/test/eunit/fabric_rpc_tests.erl b/src/fabric/test/eunit/fabric_rpc_tests.erl index 16bb66bad..c402affba 100644 --- a/src/fabric/test/eunit/fabric_rpc_tests.erl +++ b/src/fabric/test/eunit/fabric_rpc_tests.erl @@ -101,7 +101,16 @@ t_no_config_db_create_fails_for_shard_rpc(DbName) -> receive Resp0 -> Resp0 end, - ?assertMatch({Ref, {'rexi_EXIT', {{error, missing_target}, _}}}, Resp). + case couch_stats_resource_tracker:is_enabled() of + true -> + ?assertMatch( %% allow for {Ref, {rexi_EXIT, error}, {delta, D}} + {Ref, {'rexi_EXIT', {{error, missing_target}, _}}, _}, + Resp); + false -> + ?assertMatch( + {Ref, {'rexi_EXIT', {{error, missing_target}, _}}}, + Resp) + end. t_db_create_with_config(DbName) -> MDbName = mem3:dbname(DbName), diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl index 0928ae193..e11c69416 100644 --- a/src/mango/src/mango_cursor_view.erl +++ b/src/mango/src/mango_cursor_view.erl @@ -245,9 +245,11 @@ execute(#cursor{db = Db, index = Idx, execution_stats = Stats} = Cursor0, UserFu Result = case mango_idx:def(Idx) of all_docs -> + couch_stats:increment_counter([mango_cursor, view, all_docs]), CB = fun ?MODULE:handle_all_docs_message/2, fabric:all_docs(Db, DbOpts, CB, Cursor, Args); _ -> + couch_stats:increment_counter([mango_cursor, view, idx]), CB = fun ?MODULE:handle_message/2, % Normal view DDoc = ddocid(Idx), diff --git a/src/mango/src/mango_selector.erl b/src/mango/src/mango_selector.erl index 42031b756..d8d2c913c 100644 --- a/src/mango/src/mango_selector.erl +++ b/src/mango/src/mango_selector.erl @@ -50,6 +50,7 @@ normalize(Selector) -> % This assumes that the Selector has been normalized. % Returns true or false. match(Selector, D) -> + %% TODO: wire in csrt tracking couch_stats:increment_counter([mango, evaluate_selector]), match_int(Selector, D). diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl index 70fc797da..81b4fec27 100644 --- a/src/mem3/src/mem3_rpc.erl +++ b/src/mem3/src/mem3_rpc.erl @@ -378,20 +378,34 @@ rexi_call(Node, MFA, Timeout) -> Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]), Ref = rexi:cast(Node, self(), MFA, [sync]), try - receive - {Ref, {ok, Reply}} -> - Reply; - {Ref, Error} -> - erlang:error(Error); - {rexi_DOWN, Mon, _, Reason} -> - erlang:error({rexi_DOWN, {Node, Reason}}) - after Timeout -> - erlang:error(timeout) - end + wait_message(Node, Ref, Mon, Timeout) after rexi_monitor:stop(Mon) end. +wait_message(Node, Ref, Mon, Timeout) -> + receive + Msg -> + process_raw_message(Msg, Node, Ref, Mon, Timeout) + after Timeout -> + erlang:error(timeout) + end. + +process_raw_message(Msg0, Node, Ref, Mon, Timeout) -> + {Msg, Delta} = rexi_utils:extract_delta(Msg0), + couch_stats_resource_tracker:accumulate_delta(Delta), + case Msg of + {Ref, {ok, Reply}} -> + Reply; + {Ref, Error} -> + erlang:error(Error); + {rexi_DOWN, Mon, _, Reason} -> + erlang:error({rexi_DOWN, {Node, Reason}}); + Other -> + ?LOG_UNEXPECTED_MSG(Other), + wait_message(Node, Ref, Mon, Timeout) + end. + get_or_create_db(DbName, Options) -> mem3_util:get_or_create_db_int(DbName, Options). diff --git a/src/rexi/include/rexi.hrl b/src/rexi/include/rexi.hrl index a2d86b2ab..a962f3069 100644 --- a/src/rexi/include/rexi.hrl +++ b/src/rexi/include/rexi.hrl @@ -11,6 +11,7 @@ % the License. -record(error, { + delta, timestamp, reason, mfa, diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl index 02d3a9e55..3f93758bc 100644 --- a/src/rexi/src/rexi.erl +++ b/src/rexi/src/rexi.erl @@ -104,7 +104,7 @@ kill_all(NodeRefs) when is_list(NodeRefs) -> -spec reply(any()) -> any(). reply(Reply) -> {Caller, Ref} = get(rexi_from), - erlang:send(Caller, {Ref, Reply}). + erlang:send(Caller, maybe_add_delta({Ref, Reply})). %% Private function used by stream2 to initialize the stream. Message is of the %% form {OriginalRef, {self(),reference()}, Reply}, which enables the @@ -188,7 +188,7 @@ stream2(Msg, Limit, Timeout) -> {ok, Count} -> put(rexi_unacked, Count + 1), {Caller, Ref} = get(rexi_from), - erlang:send(Caller, {Ref, self(), Msg}), + erlang:send(Caller, maybe_add_delta({Ref, self(), Msg})), ok catch throw:timeout -> @@ -222,7 +222,11 @@ stream_ack(Client) -> %% ping() -> {Caller, _} = get(rexi_from), - erlang:send(Caller, {rexi, '$rexi_ping'}). + %% It is essential ping/0 includes deltas as otherwise long running + %% filtered queries will be silent on usage until they finally return + %% a row or no results. This delay is proportional to the database size, + %% so instead we make sure ping/0 keeps live stats flowing. + erlang:send(Caller, maybe_add_delta({rexi, '$rexi_ping'})). aggregate_server_queue_len() -> rexi_server_mon:aggregate_queue_len(rexi_server). @@ -282,3 +286,11 @@ drain_acks(Count) -> after 0 -> {ok, Count} end. + +maybe_add_delta(T) -> + case couch_stats_resource_tracker:is_enabled() of + false -> + T; + true -> + rexi_utils:add_delta(T, rexi_utils:get_delta()) + end. diff --git a/src/rexi/src/rexi_monitor.erl b/src/rexi/src/rexi_monitor.erl index 7fe66db71..72f0985df 100644 --- a/src/rexi/src/rexi_monitor.erl +++ b/src/rexi/src/rexi_monitor.erl @@ -35,6 +35,7 @@ start(Procs) -> %% messages from our mailbox. -spec stop(pid()) -> ok. stop(MonitoringPid) -> + unlink(MonitoringPid), MonitoringPid ! {self(), shutdown}, flush_down_messages(). diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl index b2df65c71..729979c53 100644 --- a/src/rexi/src/rexi_server.erl +++ b/src/rexi/src/rexi_server.erl @@ -102,12 +102,12 @@ handle_info({'DOWN', Ref, process, Pid, Error}, #st{workers = Workers} = St) -> case find_worker(Ref, Workers) of #job{worker_pid = Pid, worker = Ref, client_pid = CPid, client = CRef} = Job -> case Error of - #error{reason = {_Class, Reason}, stack = Stack} -> - notify_caller({CPid, CRef}, {Reason, Stack}), + #error{reason = {_Class, Reason}, stack = Stack, delta = Delta} -> + notify_caller({CPid, CRef}, {Reason, Stack}, Delta), St1 = save_error(Error, St), {noreply, remove_job(Job, St1)}; _ -> - notify_caller({CPid, CRef}, Error), + notify_caller({CPid, CRef}, Error, undefined), {noreply, remove_job(Job, St)} end; false -> @@ -134,15 +134,20 @@ init_p(From, MFA) -> string() | undefined ) -> any(). init_p(From, {M, F, A}, Nonce) -> + MFA = {M, F, length(A)}, put(rexi_from, From), - put('$initial_call', {M, F, length(A)}), + put('$initial_call', MFA), put(nonce, Nonce), try + couch_stats_resource_tracker:create_context(From, MFA, Nonce), + couch_stats:maybe_track_rexi_init_p(MFA), apply(M, F, A) catch exit:normal -> + couch_stats_resource_tracker:destroy_context(), ok; Class:Reason:Stack0 -> + couch_stats_resource_tracker:destroy_context(), Stack = clean_stack(Stack0), {ClientPid, _ClientRef} = From, couch_log:error( @@ -158,12 +163,15 @@ init_p(From, {M, F, A}, Nonce) -> ] ), exit(#error{ + delta = couch_stats_resource_tracker:make_delta(), timestamp = os:timestamp(), reason = {Class, Reason}, mfa = {M, F, A}, nonce = Nonce, stack = Stack }) + after + couch_stats_resource_tracker:destroy_context() end. %% internal @@ -200,8 +208,14 @@ find_worker(Ref, Tab) -> [Worker] -> Worker end. -notify_caller({Caller, Ref}, Reason) -> - rexi_utils:send(Caller, {Ref, {rexi_EXIT, Reason}}). +notify_caller({Caller, Ref}, Reason, Delta) -> + Msg = case couch_stats_resource_tracker:is_enabled() of + true -> + {Ref, {rexi_EXIT, Reason}, {delta, Delta}}; + false -> + {Ref, {rexi_EXIT, Reason}} + end, + rexi_utils:send(Caller, Msg). kill_worker(FromRef, #st{clients = Clients} = St) -> case find_worker(FromRef, Clients) of diff --git a/src/rexi/src/rexi_utils.erl b/src/rexi/src/rexi_utils.erl index 146d0238a..512932d27 100644 --- a/src/rexi/src/rexi_utils.erl +++ b/src/rexi/src/rexi_utils.erl @@ -13,6 +13,7 @@ -module(rexi_utils). -export([server_pid/1, send/2, recv/6]). +-export([add_delta/2, extract_delta/1, get_delta/0]). %% @doc Return a rexi_server id for the given node. server_id(Node) -> @@ -60,6 +61,16 @@ process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> receive + Msg -> + process_raw_message(Msg, RefList, Keypos, Fun, Acc0, TimeoutRef) + after PerMsgTO -> + {timeout, Acc0} + end. + +process_raw_message(Payload0, RefList, Keypos, Fun, Acc0, TimeoutRef) -> + {Payload, Delta} = extract_delta(Payload0), + couch_stats_resource_tracker:accumulate_delta(Delta), + case Payload of {timeout, TimeoutRef} -> {timeout, Acc0}; {rexi, Ref, Msg} -> @@ -95,6 +106,25 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> end; {rexi_DOWN, _, _, _} = Msg -> Fun(Msg, nil, Acc0) - after PerMsgTO -> - {timeout, Acc0} end. + +add_delta({A}, Delta) -> {A, Delta}; +add_delta({A, B}, Delta) -> {A, B, Delta}; +add_delta({A, B, C}, Delta) -> {A, B, C, Delta}; +add_delta({A, B, C, D}, Delta) -> {A, B, C, D, Delta}; +add_delta({A, B, C, D, E}, Delta) -> {A, B, C, D, E, Delta}; +add_delta({A, B, C, D, E, F}, Delta) -> {A, B, C, D, E, F, Delta}; +add_delta({A, B, C, D, E, F, G}, Delta) -> {A, B, C, D, E, F, G, Delta}; +add_delta(T, _Delta) -> T. + +extract_delta({A, {delta, Delta}}) -> {{A}, Delta}; +extract_delta({A, B, {delta, Delta}}) -> {{A, B}, Delta}; +extract_delta({A, B, C, {delta, Delta}}) -> {{A, B, C}, Delta}; +extract_delta({A, B, C, D, {delta, Delta}}) -> {{A, B, C, D}, Delta}; +extract_delta({A, B, C, D, E, {delta, Delta}}) -> {{A, B, C, D, E}, Delta}; +extract_delta({A, B, C, D, E, F, {delta, Delta}}) -> {{A, B, C, D, E, F}, Delta}; +extract_delta({A, B, C, D, E, F, G, {delta, Delta}}) -> {{A, B, C, D, E, F, G}, Delta}; +extract_delta(T) -> {T, undefined}. + +get_delta() -> + {delta, couch_stats_resource_tracker:make_delta()}.
