This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch couch-stats-resource-tracker-v2
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 9ef7c4be04e15cbd108cbb39e315e6f69702d3e0
Author: Russell Branca <[email protected]>
AuthorDate: Fri Jun 7 15:46:17 2024 -0700

    Port in csrt changes
---
 src/chttpd/src/chttpd.erl                        |   5 ++
 src/chttpd/src/chttpd_db.erl                     |   2 +
 src/chttpd/src/chttpd_httpd_handlers.erl         |   1 +
 src/chttpd/src/chttpd_misc.erl                   | 105 +++++++++++++++++++++++
 src/couch/include/couch_db.hrl                   |   2 +
 src/couch/priv/stats_descriptions.cfg            |  33 +++++++
 src/couch/src/couch_btree.erl                    |   3 +
 src/couch/src/couch_db.erl                       |   2 +
 src/couch/src/couch_query_servers.erl            |   8 ++
 src/couch/src/couch_server.erl                   |   2 +
 src/couch_stats/src/couch_stats.app.src          |   8 +-
 src/couch_stats/src/couch_stats.erl              |  30 +++++++
 src/couch_stats/src/couch_stats_sup.erl          |   1 +
 src/fabric/priv/stats_descriptions.cfg           |  50 +++++++++++
 src/fabric/src/fabric_rpc.erl                    |  15 ++++
 src/fabric/src/fabric_util.erl                   |   4 +
 src/fabric/test/eunit/fabric_rpc_purge_tests.erl |   2 +
 src/fabric/test/eunit/fabric_rpc_tests.erl       |  11 ++-
 src/mango/src/mango_cursor_view.erl              |   2 +
 src/mango/src/mango_selector.erl                 |   1 +
 src/mem3/src/mem3_rpc.erl                        |  34 +++++---
 src/rexi/include/rexi.hrl                        |   1 +
 src/rexi/src/rexi.erl                            |  18 +++-
 src/rexi/src/rexi_monitor.erl                    |   1 +
 src/rexi/src/rexi_server.erl                     |  26 ++++--
 src/rexi/src/rexi_utils.erl                      |  34 +++++++-
 26 files changed, 377 insertions(+), 24 deletions(-)

diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl
index 0c1380862..9642b2089 100644
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -323,6 +323,9 @@ handle_request_int(MochiReq) ->
     % Save client socket so that it can be monitored for disconnects
     chttpd_util:mochiweb_client_req_set(MochiReq),
 
+    %% This is probably better in before_request, but having Path is nice
+    couch_stats_resource_tracker:create_coordinator_context(HttpReq0, Path),
+
     {HttpReq2, Response} =
         case before_request(HttpReq0) of
             {ok, HttpReq1} ->
@@ -372,6 +375,7 @@ after_request(HttpReq, HttpResp0) ->
     HttpResp2 = update_stats(HttpReq, HttpResp1),
     chttpd_stats:report(HttpReq, HttpResp2),
     maybe_log(HttpReq, HttpResp2),
+    couch_stats_resource_tracker:destroy_context(),
     HttpResp2.
 
 process_request(#httpd{mochi_req = MochiReq} = HttpReq) ->
@@ -409,6 +413,7 @@ handle_req_after_auth(HandlerKey, HttpReq) ->
             HandlerKey,
             fun chttpd_db:handle_request/1
         ),
+        couch_stats_resource_tracker:set_context_handler_fun(HandlerFun),
         AuthorizedReq = chttpd_auth:authorize(
             possibly_hack(HttpReq),
             fun chttpd_auth_request:authorize_request/1
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index 062e0bf24..8a816abde 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -83,6 +83,7 @@
 
 % Database request handlers
 handle_request(#httpd{path_parts = [DbName | RestParts], method = Method} = 
Req) ->
+    couch_stats_resource_tracker:set_context_dbname(DbName),
     case {Method, RestParts} of
         {'PUT', []} ->
             create_db_req(Req, DbName);
@@ -103,6 +104,7 @@ handle_request(#httpd{path_parts = [DbName | RestParts], 
method = Method} = Req)
             do_db_req(Req, fun db_req/2);
         {_, [SecondPart | _]} ->
             Handler = chttpd_handlers:db_handler(SecondPart, fun db_req/2),
+            couch_stats_resource_tracker:set_context_handler_fun(Handler),
             do_db_req(Req, Handler)
     end.
 
diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl 
b/src/chttpd/src/chttpd_httpd_handlers.erl
index 932b52e5f..e1b260222 100644
--- a/src/chttpd/src/chttpd_httpd_handlers.erl
+++ b/src/chttpd/src/chttpd_httpd_handlers.erl
@@ -20,6 +20,7 @@ url_handler(<<"_utils">>) -> fun 
chttpd_misc:handle_utils_dir_req/1;
 url_handler(<<"_all_dbs">>) -> fun chttpd_misc:handle_all_dbs_req/1;
 url_handler(<<"_dbs_info">>) -> fun chttpd_misc:handle_dbs_info_req/1;
 url_handler(<<"_active_tasks">>) -> fun chttpd_misc:handle_task_status_req/1;
+url_handler(<<"_active_resources">>) -> fun 
chttpd_misc:handle_resource_status_req/1;
 url_handler(<<"_scheduler">>) -> fun 
couch_replicator_httpd:handle_scheduler_req/1;
 url_handler(<<"_node">>) -> fun chttpd_node:handle_node_req/1;
 url_handler(<<"_reload_query_servers">>) -> fun 
chttpd_misc:handle_reload_query_servers_req/1;
diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl
index 4b7c73b35..d35df68e5 100644
--- a/src/chttpd/src/chttpd_misc.erl
+++ b/src/chttpd/src/chttpd_misc.erl
@@ -20,6 +20,7 @@
     handle_replicate_req/1,
     handle_reload_query_servers_req/1,
     handle_task_status_req/1,
+    handle_resource_status_req/1,
     handle_up_req/1,
     handle_utils_dir_req/1,
     handle_utils_dir_req/2,
@@ -224,6 +225,110 @@ handle_task_status_req(#httpd{method = 'GET'} = Req) ->
 handle_task_status_req(Req) ->
     send_method_not_allowed(Req, "GET,HEAD").
 
+handle_resource_status_req(#httpd{method = 'POST'} = Req) ->
+    ok = chttpd:verify_is_server_admin(Req),
+    chttpd:validate_ctype(Req, "application/json"),
+    {Props} = chttpd:json_body_obj(Req),
+    Action = proplists:get_value(<<"action">>, Props),
+    Key = proplists:get_value(<<"key">>, Props),
+    Val = proplists:get_value(<<"val">>, Props),
+
+    CountBy = fun couch_stats_resource_tracker:count_by/1,
+    GroupBy = fun couch_stats_resource_tracker:group_by/2,
+    SortedBy1 = fun couch_stats_resource_tracker:sorted_by/1,
+    SortedBy2 = fun couch_stats_resource_tracker:sorted_by/2,
+    ConvertEle = fun(K) -> list_to_existing_atom(binary_to_list(K)) end,
+    ConvertList = fun(L) -> [ConvertEle(E) || E <- L] end,
+    ToJson = fun couch_stats_resource_tracker:term_to_flat_json/1,
+    JsonKeys = fun(PL) -> [[ToJson(K), V] || {K, V} <- PL] end,
+
+    Fun = case {Action, Key, Val} of
+        {<<"count_by">>, Keys, undefined} when is_list(Keys) ->
+            Keys1 = [ConvertEle(K) || K <- Keys],
+            fun() -> CountBy(Keys1) end;
+        {<<"count_by">>, Key, undefined} ->
+            Key1 = ConvertEle(Key),
+            fun() -> CountBy(Key1) end;
+        {<<"group_by">>, Keys, Vals} when is_list(Keys) andalso is_list(Vals) 
->
+            Keys1 = ConvertList(Keys),
+            Vals1 = ConvertList(Vals),
+            fun() -> GroupBy(Keys1, Vals1) end;
+        {<<"group_by">>, Key, Vals} when is_list(Vals) ->
+            Key1 = ConvertEle(Key),
+            Vals1 = ConvertList(Vals),
+            fun() -> GroupBy(Key1, Vals1) end;
+        {<<"group_by">>, Keys, Val} when is_list(Keys) ->
+            Keys1 = ConvertList(Keys),
+            Val1 = ConvertEle(Val),
+            fun() -> GroupBy(Keys1, Val1) end;
+        {<<"group_by">>, Key, Val} ->
+            Key1 = ConvertEle(Key),
+            Val1 = ConvertList(Val),
+            fun() -> GroupBy(Key1, Val1) end;
+
+        {<<"sorted_by">>, Key, undefined} ->
+            Key1 = ConvertEle(Key),
+            fun() -> JsonKeys(SortedBy1(Key1)) end;
+        {<<"sorted_by">>, Keys, undefined} when is_list(Keys) ->
+            Keys1 = [ConvertEle(K) || K <- Keys],
+            fun() -> JsonKeys(SortedBy1(Keys1)) end;
+        {<<"sorted_by">>, Keys, Vals} when is_list(Keys) andalso is_list(Vals) 
->
+            Keys1 = ConvertList(Keys),
+            Vals1 = ConvertList(Vals),
+            fun() -> JsonKeys(SortedBy2(Keys1, Vals1)) end;
+        {<<"sorted_by">>, Key, Vals} when is_list(Vals) ->
+            Key1 = ConvertEle(Key),
+            Vals1 = ConvertList(Vals),
+            fun() -> JsonKeys(SortedBy2(Key1, Vals1)) end;
+        {<<"sorted_by">>, Keys, Val} when is_list(Keys) ->
+            Keys1 = ConvertList(Keys),
+            Val1 = ConvertEle(Val),
+            fun() -> JsonKeys(SortedBy2(Keys1, Val1)) end;
+        {<<"sorted_by">>, Key, Val} ->
+            Key1 = ConvertEle(Key),
+            Val1 = ConvertList(Val),
+            fun() -> JsonKeys(SortedBy2(Key1, Val1)) end;
+        _ ->
+            throw({badrequest, invalid_resource_request})
+    end,
+
+    Fun1 = fun() ->
+        case Fun() of
+            Map when is_map(Map) ->
+                {maps:fold(
+                    fun
+                        (_K,0,A) -> A; %% TODO: Skip 0 value entries?
+                        (K,V,A) -> [{ToJson(K), V} | A]
+                    end,
+                    [], Map)};
+            List when is_list(List) ->
+                List
+        end
+    end,
+
+    {Resp, _Bad} = rpc:multicall(erlang, apply, [
+        fun() ->
+            {node(), Fun1()}
+        end,
+        []
+    ]),
+    %%io:format("{CSRT}***** GOT RESP: ~p~n", [Resp]),
+    send_json(Req, {Resp});
+handle_resource_status_req(#httpd{method = 'GET'} = Req) ->
+    ok = chttpd:verify_is_server_admin(Req),
+    {Resp, Bad} = rpc:multicall(erlang, apply, [
+        fun() ->
+            {node(), couch_stats_resource_tracker:active()}
+        end,
+        []
+    ]),
+    %% TODO: incorporate Bad responses
+    send_json(Req, {Resp});
+handle_resource_status_req(Req) ->
+    ok = chttpd:verify_is_server_admin(Req),
+    send_method_not_allowed(Req, "GET,HEAD,POST").
+
+
 handle_replicate_req(#httpd{method = 'POST', user_ctx = Ctx, req_body = 
PostBody} = Req) ->
     chttpd:validate_ctype(Req, "application/json"),
     %% see HACK in chttpd.erl about replication
diff --git a/src/couch/include/couch_db.hrl b/src/couch/include/couch_db.hrl
index 9c1df21b6..ba6bd38ca 100644
--- a/src/couch/include/couch_db.hrl
+++ b/src/couch/include/couch_db.hrl
@@ -53,6 +53,8 @@
 -define(INTERACTIVE_EDIT, interactive_edit).
 -define(REPLICATED_CHANGES, replicated_changes).
 
+-define(LOG_UNEXPECTED_MSG(Msg), couch_log:warning("[~p:~p:~p/~p]{~p[~p]} 
Unexpected message: ~w", [?MODULE, ?LINE, ?FUNCTION_NAME, ?FUNCTION_ARITY, 
self(), element(2, process_info(self(), message_queue_len)), Msg])).
+
 -type branch() :: {Key::term(), Value::term(), Tree::term()}.
 -type path() :: {Start::pos_integer(), branch()}.
 -type update_type() :: replicated_changes | interactive_edit.
diff --git a/src/couch/priv/stats_descriptions.cfg 
b/src/couch/priv/stats_descriptions.cfg
index 6a7120f87..33c556521 100644
--- a/src/couch/priv/stats_descriptions.cfg
+++ b/src/couch/priv/stats_descriptions.cfg
@@ -306,6 +306,10 @@
     {type, counter},
     {desc, <<"number of couch_server LRU operations skipped">>}
 ]}.
+{[couchdb, couch_server, open], [
+    {type, counter},
+    {desc, <<"number of couch_server open operations invoked">>}
+]}.
 {[couchdb, query_server, vdu_rejects], [
     {type, counter},
     {desc, <<"number of rejections by validate_doc_update function">>}
@@ -418,10 +422,39 @@
     {type, counter},
     {desc, <<"number of other requests">>}
 ]}.
+{[couchdb, query_server, js_filter], [
+    {type, counter},
+    {desc, <<"number of JS filter invocations">>}
+]}.
+{[couchdb, query_server, js_filtered_docs], [
+    {type, counter},
+    {desc, <<"number of docs filtered through JS invocations">>}
+]}.
+{[couchdb, query_server, js_filter_error], [
+    {type, counter},
+    {desc, <<"number of JS filter invocation errors">>}
+]}.
 {[couchdb, legacy_checksums], [
     {type, counter},
     {desc, <<"number of legacy checksums found in couch_file instances">>}
 ]}.
+{[couchdb, btree, folds], [
+    {type, counter},
+    {desc, <<"number of couch btree kv fold callback invocations">>}
+]}.
+{[couchdb, btree, kp_node], [
+    {type, counter},
+    {desc, <<"number of couch btree kp_nodes read">>}
+]}.
+{[couchdb, btree, kv_node], [
+    {type, counter},
+    {desc, <<"number of couch btree kv_nodes read">>}
+]}.
+%% CSRT (couch_stats_resource_tracker) stats
+{[couchdb, csrt, delta_missing_t0], [
+    {type, counter},
+    {desc, <<"number of csrt contexts without a proper startime">>}
+]}.
 {[pread, exceed_eof], [
     {type, counter},
     {desc, <<"number of the attempts to read beyond end of db file">>}
diff --git a/src/couch/src/couch_btree.erl b/src/couch/src/couch_btree.erl
index b974a22ee..27b5bc18b 100644
--- a/src/couch/src/couch_btree.erl
+++ b/src/couch/src/couch_btree.erl
@@ -472,6 +472,8 @@ reduce_tree_size(kp_node, NodeSize, [{_K, {_P, _Red, Sz}} | 
NodeList]) ->
 
 get_node(#btree{fd = Fd}, NodePos) ->
     {ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos),
+    %% TODO: wire in csrt tracking
+    couch_stats:increment_counter([couchdb, btree, NodeType]),
     {NodeType, NodeList}.
 
 write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) ->
@@ -1163,6 +1165,7 @@ stream_kv_node2(Bt, Reds, PrevKVs, [{K, V} | RestKVs], 
InRange, Dir, Fun, Acc) -
         false ->
             {stop, {PrevKVs, Reds}, Acc};
         true ->
+            couch_stats:increment_counter([couchdb, btree, folds]),
             AssembledKV = assemble(Bt, K, V),
             case Fun(visit, AssembledKV, {PrevKVs, Reds}, Acc) of
                 {ok, Acc2} ->
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 2ef89ced3..c7afaa4b3 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -297,6 +297,7 @@ open_doc(Db, IdOrDocInfo) ->
     open_doc(Db, IdOrDocInfo, []).
 
 open_doc(Db, Id, Options) ->
+    %% TODO: wire in csrt tracking
     increment_stat(Db, [couchdb, database_reads]),
     case open_doc_int(Db, Id, Options) of
         {ok, #doc{deleted = true} = Doc} ->
@@ -1982,6 +1983,7 @@ increment_stat(#db{options = Options}, Stat, Count) when
 ->
     case lists:member(sys_db, Options) of
         true ->
+            %% TODO: we shouldn't leak resource usage just because it's a 
sys_db
             ok;
         false ->
             couch_stats:increment_counter(Stat, Count)
diff --git a/src/couch/src/couch_query_servers.erl 
b/src/couch/src/couch_query_servers.erl
index 6789bfaef..b5ddee312 100644
--- a/src/couch/src/couch_query_servers.erl
+++ b/src/couch/src/couch_query_servers.erl
@@ -542,6 +542,8 @@ filter_docs(Req, Db, DDoc, FName, Docs) ->
         {ok, filter_docs_int(Db, DDoc, FName, JsonReq, JsonDocs)}
     catch
         throw:{os_process_error, {exit_status, 1}} ->
+            %% TODO: wire in csrt tracking
+            couch_stats:increment_counter([couchdb, query_server, 
js_filter_error]),
             %% batch used too much memory, retry sequentially.
             Fun = fun(JsonDoc) ->
                 filter_docs_int(Db, DDoc, FName, JsonReq, [JsonDoc])
@@ -550,6 +552,12 @@ filter_docs(Req, Db, DDoc, FName, Docs) ->
     end.
 
 filter_docs_int(Db, DDoc, FName, JsonReq, JsonDocs) ->
+    %% Count usage in _int version as this can be repeated for OS error
+    %% Pros & cons... might not have actually processed `length(JsonDocs)` docs
+    %% but it certainly undercounts if we count in `filter_docs/5` above
+    %% TODO: wire in csrt tracking
+    couch_stats:increment_counter([couchdb, query_server, js_filter]),
+    couch_stats:increment_counter([couchdb, query_server, js_filtered_docs], 
length(JsonDocs)),
     [true, Passes] = ddoc_prompt(
         Db,
         DDoc,
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index 379724953..576938177 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -108,6 +108,8 @@ sup_start_link(N) ->
     gen_server:start_link({local, couch_server(N)}, couch_server, [N], []).
 
 open(DbName, Options) ->
+    %% TODO: wire in csrt tracking
+    couch_stats:increment_counter([couchdb, couch_server, open]),
     try
         validate_open_or_create(DbName, Options),
         open_int(DbName, Options)
diff --git a/src/couch_stats/src/couch_stats.app.src 
b/src/couch_stats/src/couch_stats.app.src
index a54fac734..de9f00e4e 100644
--- a/src/couch_stats/src/couch_stats.app.src
+++ b/src/couch_stats/src/couch_stats.app.src
@@ -13,8 +13,12 @@
 {application, couch_stats, [
     {description, "Simple statistics collection"},
     {vsn, git},
-    {registered, [couch_stats_aggregator, couch_stats_process_tracker]},
-    {applications, [kernel, stdlib]},
+    {registered, [
+        couch_stats_aggregator,
+        couch_stats_process_tracker,
+        couch_stats_resource_tracker
+    ]},
+    {applications, [kernel, stdlib, couch_log]},
     {mod, {couch_stats_app, []}},
     {env, []}
 ]}.
diff --git a/src/couch_stats/src/couch_stats.erl 
b/src/couch_stats/src/couch_stats.erl
index 29a402449..29190e6b0 100644
--- a/src/couch_stats/src/couch_stats.erl
+++ b/src/couch_stats/src/couch_stats.erl
@@ -24,6 +24,12 @@
     update_gauge/2
 ]).
 
+%% couch_stats_resource_tracker API
+-export([
+    create_context/3,
+    maybe_track_rexi_init_p/1
+]).
+
 -type response() :: ok | {error, unknown_metric} | {error, invalid_metric}.
 -type stat() :: {any(), [{atom(), any()}]}.
 
@@ -49,6 +55,11 @@ increment_counter(Name) ->
 
 -spec increment_counter(any(), pos_integer()) -> response().
 increment_counter(Name, Value) ->
+    %% Should maybe_track_local happen before or after notify?
+    %% If after, only currently tracked metrics declared in the app's
+    %% stats_description.cfg will be trackable locally. Pros/cons.
+    %io:format("NOTIFY_EXISTING_METRIC: ~p || ~p || ~p~n", [Name, Op, Type]),
+    ok = maybe_track_local_counter(Name, Value),
     case couch_stats_util:get_counter(Name, stats()) of
         {ok, Ctx} -> couch_stats_counter:increment(Ctx, Value);
         {error, Error} -> {error, Error}
@@ -100,6 +111,25 @@ stats() ->
 now_sec() ->
     erlang:monotonic_time(second).
 
+%% Only potentially track positive increments to counters
+-spec maybe_track_local_counter(any(), any()) -> ok.
+maybe_track_local_counter(Name, Val) when is_integer(Val) andalso Val > 0 ->
+    %%io:format("maybe_track_local[~p]: ~p~n", [Val, Name]),
+    couch_stats_resource_tracker:maybe_inc(Name, Val),
+    ok;
+maybe_track_local_counter(_, _) ->
+    ok.
+
+create_context(From, MFA, Nonce) ->
+    couch_stats_resource_tracker:create_context(From, MFA, Nonce).
+
+maybe_track_rexi_init_p({M, F, _A}) ->
+    Metric = [M, F, spawned],
+    case couch_stats_resource_tracker:should_track(Metric) of
+        true -> increment_counter(Metric);
+        false -> ok
+    end.
+
 -ifdef(TEST).
 
 -include_lib("couch/include/couch_eunit.hrl").
diff --git a/src/couch_stats/src/couch_stats_sup.erl 
b/src/couch_stats/src/couch_stats_sup.erl
index 325372c3e..4b4df17e2 100644
--- a/src/couch_stats/src/couch_stats_sup.erl
+++ b/src/couch_stats/src/couch_stats_sup.erl
@@ -29,6 +29,7 @@ init([]) ->
         {
             {one_for_one, 5, 10}, [
                 ?CHILD(couch_stats_server, worker),
+                ?CHILD(couch_stats_resource_tracker, worker),
                 ?CHILD(couch_stats_process_tracker, worker)
             ]
         }}.
diff --git a/src/fabric/priv/stats_descriptions.cfg 
b/src/fabric/priv/stats_descriptions.cfg
index d12aa0c84..9ab054bf0 100644
--- a/src/fabric/priv/stats_descriptions.cfg
+++ b/src/fabric/priv/stats_descriptions.cfg
@@ -26,3 +26,53 @@
     {type, counter},
     {desc, <<"number of write quorum errors">>}
 ]}.
+
+
+%% fabric_rpc worker stats
+%% TODO: decide on which naming scheme:
+%% {[fabric_rpc, get_all_security, spawned], [
+%% {[fabric_rpc, spawned, get_all_security], [
+{[fabric_rpc, get_all_security, spawned], [
+    {type, counter},
+    {desc, <<"number of fabric_rpc worker get_all_security spawns">>}
+]}.
+{[fabric_rpc, open_doc, spawned], [
+    {type, counter},
+    {desc, <<"number of fabric_rpc worker open_doc spawns">>}
+]}.
+{[fabric_rpc, all_docs, spawned], [
+    {type, counter},
+    {desc, <<"number of fabric_rpc worker all_docs spawns">>}
+]}.
+{[fabric_rpc, update_docs, spawned], [
+    {type, counter},
+    {desc, <<"number of fabric_rpc worker update_docs spawns">>}
+]}.
+{[fabric_rpc, map_view, spawned], [
+    {type, counter},
+    {desc, <<"number of fabric_rpc worker map_view spawns">>}
+]}.
+{[fabric_rpc, reduce_view, spawned], [
+    {type, counter},
+    {desc, <<"number of fabric_rpc worker reduce_view spawns">>}
+]}.
+{[fabric_rpc, open_shard, spawned], [
+    {type, counter},
+    {desc, <<"number of fabric_rpc worker open_shard spawns">>}
+]}.
+{[fabric_rpc, changes, spawned], [
+    {type, counter},
+    {desc, <<"number of fabric_rpc worker changes spawns">>}
+]}.
+{[fabric_rpc, changes, processed], [
+    {type, counter},
+    {desc, <<"number of fabric_rpc worker changes row invocations">>}
+]}.
+{[fabric_rpc, changes, returned], [
+    {type, counter},
+    {desc, <<"number of fabric_rpc worker changes rows returned">>}
+]}.
+{[fabric_rpc, view, rows_read], [
+    {type, counter},
+    {desc, <<"number of fabric_rpc view_cb row invocations">>}
+]}.
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 9b00d9501..53b34f41a 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -493,6 +493,11 @@ view_cb({meta, Meta}, Acc) ->
     ok = rexi:stream2({meta, Meta}),
     {ok, Acc};
 view_cb({row, Props}, #mrargs{extra = Options} = Acc) ->
+    %% TODO: distinguish between rows and docs
+    %% TODO: wire in csrt tracking
+    %% TODO: distinguish between all_docs vs view call
+    couch_stats:increment_counter([fabric_rpc, view, rows_read]),
+    %%couch_stats_resource_tracker:inc(rows_read),
     % Adding another row
     ViewRow = fabric_view_row:from_props(Props, Options),
     ok = rexi:stream2(ViewRow),
@@ -529,6 +534,7 @@ changes_enumerator(#full_doc_info{} = FDI, Acc) ->
 changes_enumerator(#doc_info{id = <<"_local/", _/binary>>, high_seq = Seq}, 
Acc) ->
     {ok, Acc#fabric_changes_acc{seq = Seq, pending = 
Acc#fabric_changes_acc.pending - 1}};
 changes_enumerator(DocInfo, Acc) ->
+    couch_stats:increment_counter([fabric_rpc, changes, processed]),
     #fabric_changes_acc{
         db = Db,
         args = #changes_args{
@@ -569,6 +575,7 @@ changes_enumerator(DocInfo, Acc) ->
     {ok, Acc#fabric_changes_acc{seq = Seq, pending = Pending - 1}}.
 
 changes_row(Changes, Docs, DocInfo, Acc) ->
+    couch_stats:increment_counter([fabric_rpc, changes, returned]),
     #fabric_changes_acc{db = Db, pending = Pending, epochs = Epochs} = Acc,
     #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} 
= DocInfo,
     {change, [
@@ -667,6 +674,14 @@ clean_stack(S) ->
     ).
 
 set_io_priority(DbName, Options) ->
+    couch_stats_resource_tracker:set_context_dbname(DbName),
+    %% TODO: better approach here than using proplists?
+    case proplists:get_value(user_ctx, Options) of
+        undefined ->
+            ok;
+        #user_ctx{name = UserName} ->
+            couch_stats_resource_tracker:set_context_username(UserName)
+    end,
     case lists:keyfind(io_priority, 1, Options) of
         {io_priority, Pri} ->
             erlang:put(io_priority, Pri);
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index 4acb65c73..63c70e270 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -139,6 +139,10 @@ get_shard([#shard{node = Node, name = Name} | Rest], Opts, 
Timeout, Factor) ->
         receive
             {Ref, {ok, Db}} ->
                 {ok, Db};
+            %% TODO: switch to using rexi_utils:extract_delta
+            {Ref, {ok, Db}, {delta, Delta}} ->
+                couch_stats_resource_tracker:accumulate_delta(Delta),
+                {ok, Db};
             {Ref, {'rexi_EXIT', {{unauthorized, _} = Error, _}}} ->
                 throw(Error);
             {Ref, {'rexi_EXIT', {{forbidden, _} = Error, _}}} ->
diff --git a/src/fabric/test/eunit/fabric_rpc_purge_tests.erl 
b/src/fabric/test/eunit/fabric_rpc_purge_tests.erl
index 07e6b1d42..c7a36fbe3 100644
--- a/src/fabric/test/eunit/fabric_rpc_purge_tests.erl
+++ b/src/fabric/test/eunit/fabric_rpc_purge_tests.erl
@@ -263,6 +263,8 @@ rpc_update_doc(DbName, Doc, Opts) ->
     Reply = test_util:wait(fun() ->
         receive
             {Ref, Reply} ->
+                Reply;
+            {Ref, Reply, {delta, _}} ->
                 Reply
         after 0 ->
             wait
diff --git a/src/fabric/test/eunit/fabric_rpc_tests.erl 
b/src/fabric/test/eunit/fabric_rpc_tests.erl
index 16bb66bad..c402affba 100644
--- a/src/fabric/test/eunit/fabric_rpc_tests.erl
+++ b/src/fabric/test/eunit/fabric_rpc_tests.erl
@@ -101,7 +101,16 @@ t_no_config_db_create_fails_for_shard_rpc(DbName) ->
         receive
             Resp0 -> Resp0
         end,
-    ?assertMatch({Ref, {'rexi_EXIT', {{error, missing_target}, _}}}, Resp).
+    case couch_stats_resource_tracker:is_enabled() of
+        true ->
+            ?assertMatch( %% allow for {Ref, {rexi_EXIT, error}, {delta, D}}
+                {Ref, {'rexi_EXIT', {{error, missing_target}, _}}, _},
+                Resp);
+        false ->
+            ?assertMatch(
+                {Ref, {'rexi_EXIT', {{error, missing_target}, _}}},
+                Resp)
+    end.
 
 t_db_create_with_config(DbName) ->
     MDbName = mem3:dbname(DbName),
diff --git a/src/mango/src/mango_cursor_view.erl 
b/src/mango/src/mango_cursor_view.erl
index 0928ae193..e11c69416 100644
--- a/src/mango/src/mango_cursor_view.erl
+++ b/src/mango/src/mango_cursor_view.erl
@@ -245,9 +245,11 @@ execute(#cursor{db = Db, index = Idx, execution_stats = 
Stats} = Cursor0, UserFu
             Result =
                 case mango_idx:def(Idx) of
                     all_docs ->
+                        couch_stats:increment_counter([mango_cursor, view, 
all_docs]),
                         CB = fun ?MODULE:handle_all_docs_message/2,
                         fabric:all_docs(Db, DbOpts, CB, Cursor, Args);
                     _ ->
+                        couch_stats:increment_counter([mango_cursor, view, 
idx]),
                         CB = fun ?MODULE:handle_message/2,
                         % Normal view
                         DDoc = ddocid(Idx),
diff --git a/src/mango/src/mango_selector.erl b/src/mango/src/mango_selector.erl
index 42031b756..d8d2c913c 100644
--- a/src/mango/src/mango_selector.erl
+++ b/src/mango/src/mango_selector.erl
@@ -50,6 +50,7 @@ normalize(Selector) ->
 % This assumes that the Selector has been normalized.
 % Returns true or false.
 match(Selector, D) ->
+    %% TODO: wire in csrt tracking
     couch_stats:increment_counter([mango, evaluate_selector]),
     match_int(Selector, D).
 
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
index 70fc797da..81b4fec27 100644
--- a/src/mem3/src/mem3_rpc.erl
+++ b/src/mem3/src/mem3_rpc.erl
@@ -378,20 +378,34 @@ rexi_call(Node, MFA, Timeout) ->
     Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
     Ref = rexi:cast(Node, self(), MFA, [sync]),
     try
-        receive
-            {Ref, {ok, Reply}} ->
-                Reply;
-            {Ref, Error} ->
-                erlang:error(Error);
-            {rexi_DOWN, Mon, _, Reason} ->
-                erlang:error({rexi_DOWN, {Node, Reason}})
-        after Timeout ->
-            erlang:error(timeout)
-        end
+        wait_message(Node, Ref, Mon, Timeout)
     after
         rexi_monitor:stop(Mon)
     end.
 
+wait_message(Node, Ref, Mon, Timeout) ->
+    receive
+        Msg ->
+            process_raw_message(Msg, Node, Ref, Mon, Timeout)
+    after Timeout ->
+        erlang:error(timeout)
+    end.
+
+process_raw_message(Msg0, Node, Ref, Mon, Timeout) ->
+    {Msg, Delta} = rexi_utils:extract_delta(Msg0),
+    couch_stats_resource_tracker:accumulate_delta(Delta),
+    case Msg of
+        {Ref, {ok, Reply}} ->
+            Reply;
+        {Ref, Error} ->
+            erlang:error(Error);
+        {rexi_DOWN, Mon, _, Reason} ->
+            erlang:error({rexi_DOWN, {Node, Reason}});
+        Other ->
+            ?LOG_UNEXPECTED_MSG(Other),
+            wait_message(Node, Ref, Mon, Timeout)
+    end.
+
 get_or_create_db(DbName, Options) ->
     mem3_util:get_or_create_db_int(DbName, Options).
 
diff --git a/src/rexi/include/rexi.hrl b/src/rexi/include/rexi.hrl
index a2d86b2ab..a962f3069 100644
--- a/src/rexi/include/rexi.hrl
+++ b/src/rexi/include/rexi.hrl
@@ -11,6 +11,7 @@
 % the License.
 
 -record(error, {
+    delta,
     timestamp,
     reason,
     mfa,
diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl
index 02d3a9e55..3f93758bc 100644
--- a/src/rexi/src/rexi.erl
+++ b/src/rexi/src/rexi.erl
@@ -104,7 +104,7 @@ kill_all(NodeRefs) when is_list(NodeRefs) ->
 -spec reply(any()) -> any().
 reply(Reply) ->
     {Caller, Ref} = get(rexi_from),
-    erlang:send(Caller, {Ref, Reply}).
+    erlang:send(Caller, maybe_add_delta({Ref, Reply})).
 
 %% Private function used by stream2 to initialize the stream. Message is of the
 %% form {OriginalRef, {self(),reference()}, Reply}, which enables the
@@ -188,7 +188,7 @@ stream2(Msg, Limit, Timeout) ->
         {ok, Count} ->
             put(rexi_unacked, Count + 1),
             {Caller, Ref} = get(rexi_from),
-            erlang:send(Caller, {Ref, self(), Msg}),
+            erlang:send(Caller, maybe_add_delta({Ref, self(), Msg})),
             ok
     catch
         throw:timeout ->
@@ -222,7 +222,11 @@ stream_ack(Client) ->
 %%
 ping() ->
     {Caller, _} = get(rexi_from),
-    erlang:send(Caller, {rexi, '$rexi_ping'}).
+    %% It is essential ping/0 includes deltas as otherwise long running
+    %% filtered queries will be silent on usage until they finally return
+    %% a row or no results. This delay is proportional to the database size,
+    %% so instead we make sure ping/0 keeps live stats flowing.
+    erlang:send(Caller, maybe_add_delta({rexi, '$rexi_ping'})).
 
 aggregate_server_queue_len() ->
     rexi_server_mon:aggregate_queue_len(rexi_server).
@@ -282,3 +286,11 @@ drain_acks(Count) ->
     after 0 ->
         {ok, Count}
     end.
+
+maybe_add_delta(T) ->
+    case couch_stats_resource_tracker:is_enabled() of
+        false ->
+            T;
+        true ->
+            rexi_utils:add_delta(T, rexi_utils:get_delta())
+    end.
diff --git a/src/rexi/src/rexi_monitor.erl b/src/rexi/src/rexi_monitor.erl
index 7fe66db71..72f0985df 100644
--- a/src/rexi/src/rexi_monitor.erl
+++ b/src/rexi/src/rexi_monitor.erl
@@ -35,6 +35,7 @@ start(Procs) ->
 %% messages from our mailbox.
 -spec stop(pid()) -> ok.
 stop(MonitoringPid) ->
+    unlink(MonitoringPid),
     MonitoringPid ! {self(), shutdown},
     flush_down_messages().
 
diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl
index b2df65c71..729979c53 100644
--- a/src/rexi/src/rexi_server.erl
+++ b/src/rexi/src/rexi_server.erl
@@ -102,12 +102,12 @@ handle_info({'DOWN', Ref, process, Pid, Error}, 
#st{workers = Workers} = St) ->
     case find_worker(Ref, Workers) of
         #job{worker_pid = Pid, worker = Ref, client_pid = CPid, client = CRef} 
= Job ->
             case Error of
-                #error{reason = {_Class, Reason}, stack = Stack} ->
-                    notify_caller({CPid, CRef}, {Reason, Stack}),
+                #error{reason = {_Class, Reason}, stack = Stack, delta = 
Delta} ->
+                    notify_caller({CPid, CRef}, {Reason, Stack}, Delta),
                     St1 = save_error(Error, St),
                     {noreply, remove_job(Job, St1)};
                 _ ->
-                    notify_caller({CPid, CRef}, Error),
+                    notify_caller({CPid, CRef}, Error, undefined),
                     {noreply, remove_job(Job, St)}
             end;
         false ->
@@ -134,15 +134,20 @@ init_p(From, MFA) ->
     string() | undefined
 ) -> any().
 init_p(From, {M, F, A}, Nonce) ->
+    MFA = {M, F, length(A)},
     put(rexi_from, From),
-    put('$initial_call', {M, F, length(A)}),
+    put('$initial_call', MFA),
     put(nonce, Nonce),
     try
+        couch_stats_resource_tracker:create_context(From, MFA, Nonce),
+        couch_stats:maybe_track_rexi_init_p(MFA),
         apply(M, F, A)
     catch
         exit:normal ->
+            couch_stats_resource_tracker:destroy_context(),
             ok;
         Class:Reason:Stack0 ->
+            couch_stats_resource_tracker:destroy_context(),
             Stack = clean_stack(Stack0),
             {ClientPid, _ClientRef} = From,
             couch_log:error(
@@ -158,12 +163,15 @@ init_p(From, {M, F, A}, Nonce) ->
                 ]
             ),
             exit(#error{
+                delta = couch_stats_resource_tracker:make_delta(),
                 timestamp = os:timestamp(),
                 reason = {Class, Reason},
                 mfa = {M, F, A},
                 nonce = Nonce,
                 stack = Stack
             })
+    after
+        couch_stats_resource_tracker:destroy_context()
     end.
 
 %% internal
@@ -200,8 +208,14 @@ find_worker(Ref, Tab) ->
         [Worker] -> Worker
     end.
 
-notify_caller({Caller, Ref}, Reason) ->
-    rexi_utils:send(Caller, {Ref, {rexi_EXIT, Reason}}).
+notify_caller({Caller, Ref}, Reason, Delta) ->
+    Msg = case couch_stats_resource_tracker:is_enabled() of
+        true ->
+            {Ref, {rexi_EXIT, Reason}, {delta, Delta}};
+        false ->
+            {Ref, {rexi_EXIT, Reason}}
+    end,
+    rexi_utils:send(Caller, Msg).
 
 kill_worker(FromRef, #st{clients = Clients} = St) ->
     case find_worker(FromRef, Clients) of
diff --git a/src/rexi/src/rexi_utils.erl b/src/rexi/src/rexi_utils.erl
index 146d0238a..512932d27 100644
--- a/src/rexi/src/rexi_utils.erl
+++ b/src/rexi/src/rexi_utils.erl
@@ -13,6 +13,7 @@
 -module(rexi_utils).
 
 -export([server_pid/1, send/2, recv/6]).
+-export([add_delta/2, extract_delta/1, get_delta/0]).
 
 %% @doc Return a rexi_server id for the given node.
 server_id(Node) ->
@@ -60,6 +61,16 @@ process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, 
PerMsgTO) ->
 
 process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
     receive
+        Msg ->
+            process_raw_message(Msg, RefList, Keypos, Fun, Acc0, TimeoutRef)
+    after PerMsgTO ->
+        {timeout, Acc0}
+    end.
+
+process_raw_message(Payload0, RefList, Keypos, Fun, Acc0, TimeoutRef) ->
+    {Payload, Delta} = extract_delta(Payload0),
+    couch_stats_resource_tracker:accumulate_delta(Delta),
+    case Payload of
         {timeout, TimeoutRef} ->
             {timeout, Acc0};
         {rexi, Ref, Msg} ->
@@ -95,6 +106,25 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, 
PerMsgTO) ->
             end;
         {rexi_DOWN, _, _, _} = Msg ->
             Fun(Msg, nil, Acc0)
-    after PerMsgTO ->
-        {timeout, Acc0}
     end.
+
+add_delta({A}, Delta) -> {A, Delta};
+add_delta({A, B}, Delta) -> {A, B, Delta};
+add_delta({A, B, C}, Delta) -> {A, B, C, Delta};
+add_delta({A, B, C, D}, Delta) -> {A, B, C, D, Delta};
+add_delta({A, B, C, D, E}, Delta) -> {A, B, C, D, E, Delta};
+add_delta({A, B, C, D, E, F}, Delta) -> {A, B, C, D, E, F, Delta};
+add_delta({A, B, C, D, E, F, G}, Delta) -> {A, B, C, D, E, F, G, Delta};
+add_delta(T, _Delta) -> T.
+
+extract_delta({A, {delta, Delta}}) -> {{A}, Delta};
+extract_delta({A, B, {delta, Delta}}) -> {{A, B}, Delta};
+extract_delta({A, B, C, {delta, Delta}}) -> {{A, B, C}, Delta};
+extract_delta({A, B, C, D, {delta, Delta}}) -> {{A, B, C, D}, Delta};
+extract_delta({A, B, C, D, E, {delta, Delta}}) -> {{A, B, C, D, E}, Delta};
+extract_delta({A, B, C, D, E, F, {delta, Delta}}) -> {{A, B, C, D, E, F}, 
Delta};
+extract_delta({A, B, C, D, E, F, G, {delta, Delta}}) -> {{A, B, C, D, E, F, 
G}, Delta};
+extract_delta(T) -> {T, undefined}.
+
+get_delta() ->
+    {delta, couch_stats_resource_tracker:make_delta()}.


Reply via email to