This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch add-writes-rate-limiting-to-scanner
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 8ecd647afbfd81bc9d26dce4b3f8079aeab1ec0a
Author: Nick Vatamaniuc <vatam...@gmail.com>
AuthorDate: Tue Sep 9 12:54:23 2025 -0400

    Add write limiting to the scanner
    
    Previously, scanner applied db, shard and doc open rate limits. However,
    plugins may want to also perform updates and still ensure they always stay 
in
    the background and only consume a limited amount of resources in a cluster. 
For
    that add a `doc_write` rate limit option.
    
    Plugins which perform write can use the ``couch_scanner_rate_limiter``
    explicitly: initialize, then consume tokens from it during every
    update (possibly indicated they used more than one token in a single 
operation)
    and then sleep the recommended amount of time provided the rate limiter.
    
    Added a simple example of how it could work in the 
couch_scanner_rate_limiter
    module in the comments at the top.
---
 rel/overlay/etc/default.ini                        |  6 +++
 .../src/couch_scanner_rate_limiter.erl             | 50 ++++++++++++++++++----
 src/docs/src/config/scanner.rst                    | 10 +++++
 3 files changed, 58 insertions(+), 8 deletions(-)

diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 7b89bc3ff..63db3f854 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -1047,6 +1047,12 @@ url = {{nouveau_url}}
 ; is shared across all running plugins.
 ;doc_rate_limit = 1000
 
+; Limit the rate per second at which plugins write/update documents. The rate
+; is shared across all running plugins. Unlike other rate limit which are
+; applied automatically by the plugin backend this rate assume the plugins will
+; explicitly use the couch_scanner_rate_limiter API when performing writes.
+;doc_write_rate_limit = 500
+
 ; Batch size to use when fetching design documents. For lots of small design
 ; documents this value could be increased to 500 or 1000. If design documents
 ; are large (100KB+) it could make sense to decrease it a bit to 25 or 10.
diff --git a/src/couch_scanner/src/couch_scanner_rate_limiter.erl 
b/src/couch_scanner/src/couch_scanner_rate_limiter.erl
index 2717be69e..2eb662e45 100644
--- a/src/couch_scanner/src/couch_scanner_rate_limiter.erl
+++ b/src/couch_scanner/src/couch_scanner_rate_limiter.erl
@@ -21,6 +21,18 @@
 %
 % [1] https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease
 %
+% Example of usage:
+%
+%  initialize:
+%    Limiter = couch_scanner_rate_limiter:get(),
+%
+%  use:
+%    bulk_docs(#{docs => [doc1, doc2, doc3]}),
+%    {Wait, Limiter1} = couch_scanner_rate_limiter:update(Limiter, doc_write, 
3),
+%    timer:sleep(Wait)
+%       or
+%    receive .... after Wait -> ... end
+%
 
 -module(couch_scanner_rate_limiter).
 
@@ -29,7 +41,8 @@
 -export([
     start_link/0,
     get/0,
-    update/2
+    update/2,
+    update/3
 ]).
 
 % gen_server callbacks
@@ -62,16 +75,17 @@
 -define(DB_RATE_DEFAULT, 25).
 -define(SHARD_RATE_DEFAULT, 50).
 -define(DOC_RATE_DEFAULT, 1000).
+-define(DOC_WRITE_RATE_DEFAULT, 500).
 
 % Atomic ref indices. They start at 1.
--define(INDICES, #{db => 1, shard => 2, doc => 3}).
+-define(INDICES, #{db => 1, shard => 2, doc => 3, doc_write => 4}).
 
 % Record maintained by the clients. Each client will have one of these handles.
 % With each update/2 call they will update their own backoff values.
 %
 -record(client_st, {
     ref,
-    % db|shard|doc => {Backoff, UpdateTStamp}
+    % db|shard|doc|doc_write => {Backoff, UpdateTStamp}
     backoffs = #{}
 }).
 
@@ -83,13 +97,17 @@
 get() ->
     Ref = gen_server:call(?MODULE, get, infinity),
     NowMSec = erlang:monotonic_time(millisecond),
-    Backoffs = maps:from_keys([db, shard, doc], {?INIT_BACKOFF, NowMSec}),
+    Backoffs = maps:from_keys([db, shard, doc, doc_write], {?INIT_BACKOFF, 
NowMSec}),
     #client_st{ref = Ref, backoffs = Backoffs}.
 
-update(#client_st{ref = Ref, backoffs = Backoffs} = St, Type) when
-    Type =:= db orelse Type =:= shard orelse Type =:= doc
+update(St, Type) ->
+    update(St, Type, 1).
+
+update(#client_st{ref = Ref, backoffs = Backoffs} = St, Type, Count) when
+    (is_integer(Count) andalso Count >= 0) andalso
+        (Type =:= db orelse Type =:= shard orelse Type =:= doc orelse Type =:= 
doc_write)
 ->
-    AtLimit = atomics:sub_get(Ref, map_get(Type, ?INDICES), 1) =< 0,
+    AtLimit = atomics:sub_get(Ref, map_get(Type, ?INDICES), Count) =< 0,
     {Backoff, TStamp} = map_get(Type, Backoffs),
     NowMSec = erlang:monotonic_time(millisecond),
     case NowMSec - TStamp > ?SENSITIVITY_MSEC of
@@ -142,6 +160,7 @@ refill(#st{ref = Ref} = St) ->
     ok = atomics:put(Ref, map_get(db, ?INDICES), db_limit()),
     ok = atomics:put(Ref, map_get(shard, ?INDICES), shard_limit()),
     ok = atomics:put(Ref, map_get(doc, ?INDICES), doc_limit()),
+    ok = atomics:put(Ref, map_get(doc_write, ?INDICES), doc_write_limit()),
     schedule_refill(St).
 
 update_backoff(true, 0) ->
@@ -160,6 +179,9 @@ shard_limit() ->
 doc_limit() ->
     cfg_int("doc_rate_limit", ?DOC_RATE_DEFAULT).
 
+doc_write_limit() ->
+    cfg_int("doc_write_rate_limit", ?DOC_WRITE_RATE_DEFAULT).
+
 cfg_int(Key, Default) when is_list(Key), is_integer(Default) ->
     config:get_integer("couch_scanner", Key, Default).
 
@@ -175,6 +197,7 @@ couch_scanner_rate_limiter_test_() ->
         [
             ?TDEF_FE(t_init),
             ?TDEF_FE(t_update),
+            ?TDEF_FE(t_update_multiple),
             ?TDEF_FE(t_refill)
         ]
     }.
@@ -184,7 +207,8 @@ t_init(_) ->
     ?assertEqual(ok, refill()),
     ?assertMatch({Val, #client_st{}} when is_number(Val), update(ClientSt, 
db)),
     ?assertMatch({Val, #client_st{}} when is_number(Val), update(ClientSt, 
shard)),
-    ?assertMatch({Val, #client_st{}} when is_number(Val), update(ClientSt, 
doc)).
+    ?assertMatch({Val, #client_st{}} when is_number(Val), update(ClientSt, 
doc)),
+    ?assertMatch({Val, #client_st{}} when is_number(Val), update(ClientSt, 
doc_write)).
 
 t_update(_) ->
     ClientSt = ?MODULE:get(),
@@ -196,6 +220,16 @@ t_update(_) ->
     {Backoff, _} = update(ClientSt1, db),
     ?assertEqual(?MAX_BACKOFF, Backoff).
 
+t_update_multiple(_) ->
+    ClientSt = ?MODULE:get(),
+    Fun = fun(_, Acc) ->
+        {_, Acc1} = update(Acc, doc_write, 100),
+        reset_time(Acc1, doc_write)
+    end,
+    ClientSt1 = lists:foldl(Fun, ClientSt, lists:seq(1, 50)),
+    {Backoff, _} = update(ClientSt1, doc_write, 100),
+    ?assertEqual(?MAX_BACKOFF, Backoff).
+
 t_refill(_) ->
     ClientSt = ?MODULE:get(),
     Fun = fun(_, Acc) ->
diff --git a/src/docs/src/config/scanner.rst b/src/docs/src/config/scanner.rst
index d3644c23f..178fc1439 100644
--- a/src/docs/src/config/scanner.rst
+++ b/src/docs/src/config/scanner.rst
@@ -85,6 +85,16 @@ Scanner Options
             [couch_scanner]
             doc_rate_limit = 1000
 
+    .. config:option:: doc_write_rate_limit
+
+        Limit the rate at which plugins update documents. This rate limit
+        applies to plugins which explicitly use the
+        ``couch_scanner_rate_limiter`` module for rate limiting ::
+
+            [couch_scanner]
+            doc_write_rate_limit = 500
+
+
     .. config:option:: ddoc_batch_size
 
         Batch size to use when fetching design documents. For lots of small

Reply via email to