This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch add-writes-rate-limiting-to-scanner in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 8ecd647afbfd81bc9d26dce4b3f8079aeab1ec0a Author: Nick Vatamaniuc <vatam...@gmail.com> AuthorDate: Tue Sep 9 12:54:23 2025 -0400 Add write limiting to the scanner Previously, scanner applied db, shard and doc open rate limits. However, plugins may want to also perform updates and still ensure they always stay in the background and only consume a limited amount of resources in a cluster. For that add a `doc_write` rate limit option. Plugins which perform write can use the ``couch_scanner_rate_limiter`` explicitly: initialize, then consume tokens from it during every update (possibly indicated they used more than one token in a single operation) and then sleep the recommended amount of time provided the rate limiter. Added a simple example of how it could work in the couch_scanner_rate_limiter module in the comments at the top. --- rel/overlay/etc/default.ini | 6 +++ .../src/couch_scanner_rate_limiter.erl | 50 ++++++++++++++++++---- src/docs/src/config/scanner.rst | 10 +++++ 3 files changed, 58 insertions(+), 8 deletions(-) diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 7b89bc3ff..63db3f854 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -1047,6 +1047,12 @@ url = {{nouveau_url}} ; is shared across all running plugins. ;doc_rate_limit = 1000 +; Limit the rate per second at which plugins write/update documents. The rate +; is shared across all running plugins. Unlike other rate limit which are +; applied automatically by the plugin backend this rate assume the plugins will +; explicitly use the couch_scanner_rate_limiter API when performing writes. +;doc_write_rate_limit = 500 + ; Batch size to use when fetching design documents. For lots of small design ; documents this value could be increased to 500 or 1000. If design documents ; are large (100KB+) it could make sense to decrease it a bit to 25 or 10. diff --git a/src/couch_scanner/src/couch_scanner_rate_limiter.erl b/src/couch_scanner/src/couch_scanner_rate_limiter.erl index 2717be69e..2eb662e45 100644 --- a/src/couch_scanner/src/couch_scanner_rate_limiter.erl +++ b/src/couch_scanner/src/couch_scanner_rate_limiter.erl @@ -21,6 +21,18 @@ % % [1] https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease % +% Example of usage: +% +% initialize: +% Limiter = couch_scanner_rate_limiter:get(), +% +% use: +% bulk_docs(#{docs => [doc1, doc2, doc3]}), +% {Wait, Limiter1} = couch_scanner_rate_limiter:update(Limiter, doc_write, 3), +% timer:sleep(Wait) +% or +% receive .... after Wait -> ... end +% -module(couch_scanner_rate_limiter). @@ -29,7 +41,8 @@ -export([ start_link/0, get/0, - update/2 + update/2, + update/3 ]). % gen_server callbacks @@ -62,16 +75,17 @@ -define(DB_RATE_DEFAULT, 25). -define(SHARD_RATE_DEFAULT, 50). -define(DOC_RATE_DEFAULT, 1000). +-define(DOC_WRITE_RATE_DEFAULT, 500). % Atomic ref indices. They start at 1. --define(INDICES, #{db => 1, shard => 2, doc => 3}). +-define(INDICES, #{db => 1, shard => 2, doc => 3, doc_write => 4}). % Record maintained by the clients. Each client will have one of these handles. % With each update/2 call they will update their own backoff values. % -record(client_st, { ref, - % db|shard|doc => {Backoff, UpdateTStamp} + % db|shard|doc|doc_write => {Backoff, UpdateTStamp} backoffs = #{} }). @@ -83,13 +97,17 @@ get() -> Ref = gen_server:call(?MODULE, get, infinity), NowMSec = erlang:monotonic_time(millisecond), - Backoffs = maps:from_keys([db, shard, doc], {?INIT_BACKOFF, NowMSec}), + Backoffs = maps:from_keys([db, shard, doc, doc_write], {?INIT_BACKOFF, NowMSec}), #client_st{ref = Ref, backoffs = Backoffs}. -update(#client_st{ref = Ref, backoffs = Backoffs} = St, Type) when - Type =:= db orelse Type =:= shard orelse Type =:= doc +update(St, Type) -> + update(St, Type, 1). + +update(#client_st{ref = Ref, backoffs = Backoffs} = St, Type, Count) when + (is_integer(Count) andalso Count >= 0) andalso + (Type =:= db orelse Type =:= shard orelse Type =:= doc orelse Type =:= doc_write) -> - AtLimit = atomics:sub_get(Ref, map_get(Type, ?INDICES), 1) =< 0, + AtLimit = atomics:sub_get(Ref, map_get(Type, ?INDICES), Count) =< 0, {Backoff, TStamp} = map_get(Type, Backoffs), NowMSec = erlang:monotonic_time(millisecond), case NowMSec - TStamp > ?SENSITIVITY_MSEC of @@ -142,6 +160,7 @@ refill(#st{ref = Ref} = St) -> ok = atomics:put(Ref, map_get(db, ?INDICES), db_limit()), ok = atomics:put(Ref, map_get(shard, ?INDICES), shard_limit()), ok = atomics:put(Ref, map_get(doc, ?INDICES), doc_limit()), + ok = atomics:put(Ref, map_get(doc_write, ?INDICES), doc_write_limit()), schedule_refill(St). update_backoff(true, 0) -> @@ -160,6 +179,9 @@ shard_limit() -> doc_limit() -> cfg_int("doc_rate_limit", ?DOC_RATE_DEFAULT). +doc_write_limit() -> + cfg_int("doc_write_rate_limit", ?DOC_WRITE_RATE_DEFAULT). + cfg_int(Key, Default) when is_list(Key), is_integer(Default) -> config:get_integer("couch_scanner", Key, Default). @@ -175,6 +197,7 @@ couch_scanner_rate_limiter_test_() -> [ ?TDEF_FE(t_init), ?TDEF_FE(t_update), + ?TDEF_FE(t_update_multiple), ?TDEF_FE(t_refill) ] }. @@ -184,7 +207,8 @@ t_init(_) -> ?assertEqual(ok, refill()), ?assertMatch({Val, #client_st{}} when is_number(Val), update(ClientSt, db)), ?assertMatch({Val, #client_st{}} when is_number(Val), update(ClientSt, shard)), - ?assertMatch({Val, #client_st{}} when is_number(Val), update(ClientSt, doc)). + ?assertMatch({Val, #client_st{}} when is_number(Val), update(ClientSt, doc)), + ?assertMatch({Val, #client_st{}} when is_number(Val), update(ClientSt, doc_write)). t_update(_) -> ClientSt = ?MODULE:get(), @@ -196,6 +220,16 @@ t_update(_) -> {Backoff, _} = update(ClientSt1, db), ?assertEqual(?MAX_BACKOFF, Backoff). +t_update_multiple(_) -> + ClientSt = ?MODULE:get(), + Fun = fun(_, Acc) -> + {_, Acc1} = update(Acc, doc_write, 100), + reset_time(Acc1, doc_write) + end, + ClientSt1 = lists:foldl(Fun, ClientSt, lists:seq(1, 50)), + {Backoff, _} = update(ClientSt1, doc_write, 100), + ?assertEqual(?MAX_BACKOFF, Backoff). + t_refill(_) -> ClientSt = ?MODULE:get(), Fun = fun(_, Acc) -> diff --git a/src/docs/src/config/scanner.rst b/src/docs/src/config/scanner.rst index d3644c23f..178fc1439 100644 --- a/src/docs/src/config/scanner.rst +++ b/src/docs/src/config/scanner.rst @@ -85,6 +85,16 @@ Scanner Options [couch_scanner] doc_rate_limit = 1000 + .. config:option:: doc_write_rate_limit + + Limit the rate at which plugins update documents. This rate limit + applies to plugins which explicitly use the + ``couch_scanner_rate_limiter`` module for rate limiting :: + + [couch_scanner] + doc_write_rate_limit = 500 + + .. config:option:: ddoc_batch_size Batch size to use when fetching design documents. For lots of small