This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch nouveau-ibrowse-improvements
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/nouveau-ibrowse-improvements 
by this push:
     new 75aa61647 reintroduce flow control for async index update
75aa61647 is described below

commit 75aa616473ae6c473f1af9dfbbe7d9b47803fbe2
Author: Robert Newson <[email protected]>
AuthorDate: Thu Sep 7 10:20:55 2023 +0100

    reintroduce flow control for async index update
---
 src/nouveau/src/nouveau_api.erl           | 49 +++++++++++++------------------
 src/nouveau/src/nouveau_index_manager.erl |  4 +--
 src/nouveau/src/nouveau_index_updater.erl | 19 +++++++-----
 src/nouveau/src/nouveau_util.erl          | 10 ++++++-
 4 files changed, 43 insertions(+), 39 deletions(-)

diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl
index c2811792f..c133d951b 100644
--- a/src/nouveau/src/nouveau_api.erl
+++ b/src/nouveau/src/nouveau_api.erl
@@ -29,7 +29,7 @@
     search/2,
     set_purge_seq/3,
     set_update_seq/3,
-    drain_async_responses/1,
+    drain_async_responses/2,
     jaxrs_error/2
 ]).
 
@@ -201,39 +201,32 @@ set_seq(#index{} = Index, ReqBody) ->
             send_error(Reason)
     end.
 
-drain_async_responses(List) when is_list(List) ->
-    drain_async_responses_list(List);
-drain_async_responses(Timeout) when is_integer(Timeout); Timeout == infinity ->
-    drain_async_responses_timeout(Timeout, []).
-
-drain_async_responses_list([]) ->
-    ok;
-drain_async_responses_list([ReqId | Rest]) ->
-    receive
-        {ibrowse_async_headers, ReqId, Code, Headers} ->
-            case drain_async_response(ReqId, Code, Headers, undefined) of
-                {ok, "204", _Headers, _Body} ->
-                    drain_async_responses_list(Rest);
-                {ok, StatusCode, _Headers, RespBody} ->
-                    exit({error, jaxrs_error(StatusCode, RespBody)})
-            end
+%% wait for enough async responses to reduce the Queue to Min length.
+drain_async_responses(Queue0, Min) when Min >= 0 ->
+    case queue:len(Queue0) > Min of
+        true ->
+            {{value, ReqId}, Queue1} = queue:out(Queue0),
+            wait_for_response(ReqId),
+            drain_async_responses(Queue1, Min);
+        false ->
+            Queue0
     end.
 
-drain_async_responses_timeout(Timeout, ReqIds) when is_integer(Timeout); 
Timeout == infinity ->
-    receive
-        {ibrowse_async_headers, ReqId, Code0, Headers0} ->
-            case drain_async_response(ReqId, Code0, Headers0, undefined) of
-                {ok, "204", _Headers, _Body} ->
-                    drain_async_responses_timeout(Timeout, [ReqId | ReqIds]);
-                {ok, StatusCode, _Headers, RespBody} ->
-                    exit({error, jaxrs_error(StatusCode, RespBody)})
-            end
-    after Timeout ->
-        ReqIds
+wait_for_response(ReqId) ->
+    case drain_async_response(ReqId) of
+        {ok, "204", _Headers, _Body} ->
+            ok;
+        {ok, StatusCode, _Headers, RespBody} ->
+            exit({error, jaxrs_error(StatusCode, RespBody)})
     end.
 
+drain_async_response(ReqId) ->
+    drain_async_response(ReqId, undefined, undefined, undefined).
+
 drain_async_response(ReqId, Code0, Headers0, Body0) ->
     receive
+        {ibrowse_async_headers, ReqId, Code1, Headers1} ->
+            drain_async_response(ReqId, Code1, Headers1, Body0);
         {ibrowse_async_response, ReqId, Body1} ->
             drain_async_response(ReqId, Code0, Headers0, Body1);
         {ibrowse_async_response_end, ReqId} ->
diff --git a/src/nouveau/src/nouveau_index_manager.erl 
b/src/nouveau/src/nouveau_index_manager.erl
index bfbd74990..5afb43fc1 100644
--- a/src/nouveau/src/nouveau_index_manager.erl
+++ b/src/nouveau/src/nouveau_index_manager.erl
@@ -152,10 +152,10 @@ configure_ibrowse(URL) ->
     ibrowse:set_max_sessions(
         Host,
         Port,
-        config:get_integer("nouveau", "max_sessions", 100)
+        nouveau_util:max_sessions()
     ),
     ibrowse:set_max_pipeline_size(
         Host,
         Port,
-        config:get_integer("nouveau", "max_pipeline_size", 1000)
+        nouveau_util:max_pipeline_size()
     ).
diff --git a/src/nouveau/src/nouveau_index_updater.erl 
b/src/nouveau/src/nouveau_index_updater.erl
index 65533a90c..dfc6eb122 100644
--- a/src/nouveau/src/nouveau_index_updater.erl
+++ b/src/nouveau/src/nouveau_index_updater.erl
@@ -33,9 +33,10 @@
     changes_done,
     total_changes,
     exclude_idrevs,
-    reqids = [],
+    reqids,
     conn_pid,
-    update_seq
+    update_seq,
+    max_pipeline_size
 }).
 
 -record(purge_acc, {
@@ -97,13 +98,15 @@ update(#index{} = Index) ->
                         changes_done = 0,
                         total_changes = TotalChanges,
                         exclude_idrevs = PurgeAcc1#purge_acc.exclude_list,
+                        reqids = queue:new(),
                         conn_pid = ConnPid,
-                        update_seq = PurgeAcc1#purge_acc.index_update_seq
+                        update_seq = PurgeAcc1#purge_acc.index_update_seq,
+                        max_pipeline_size = nouveau_util:max_pipeline_size()
                     },
                     {ok, Acc1} = couch_db:fold_changes(
                         Db, Acc0#acc.update_seq, fun load_docs/2, Acc0, []
                     ),
-                    
nouveau_api:drain_async_responses(lists:reverse(Acc1#acc.reqids)),
+                    nouveau_api:drain_async_responses(Acc1#acc.reqids, 0),
                     ibrowse:stop_worker_process(ConnPid),
                     ok = nouveau_api:set_update_seq(Index, 
Acc1#acc.update_seq, NewCurSeq)
                 after
@@ -117,9 +120,9 @@ update(#index{} = Index) ->
 load_docs(#full_doc_info{id = <<"_design/", _/binary>>}, #acc{} = Acc) ->
     {ok, Acc};
 load_docs(FDI, #acc{} = Acc0) ->
-    %% collect completed requests without blocking
-    ReqIds = nouveau_api:drain_async_responses(0),
-    Acc1 = Acc0#acc{reqids = Acc0#acc.reqids -- ReqIds},
+    %% block for responses so we stay under the max pipeline size
+    ReqIds1 = nouveau_api:drain_async_responses(Acc0#acc.reqids, 
Acc0#acc.max_pipeline_size),
+    Acc1 = Acc0#acc{reqids = ReqIds1},
 
     couch_task_status:update([
         {changes_done, Acc1#acc.changes_done},
@@ -146,7 +149,7 @@ load_docs(FDI, #acc{} = Acc0) ->
                     {ibrowse_req_id, ReqId} ->
                         Acc1#acc{
                             update_seq = DI#doc_info.high_seq,
-                            reqids = [ReqId | Acc1#acc.reqids]
+                            reqids = queue:in(ReqId, Acc1#acc.reqids)
                         };
                     {error, Reason} ->
                         exit({error, Reason})
diff --git a/src/nouveau/src/nouveau_util.erl b/src/nouveau/src/nouveau_util.erl
index 86fc3a4d8..57aaf12aa 100644
--- a/src/nouveau/src/nouveau_util.erl
+++ b/src/nouveau/src/nouveau_util.erl
@@ -27,7 +27,9 @@
     maybe_create_local_purge_doc/2,
     get_local_purge_doc_id/1,
     get_local_purge_doc_body/3,
-    nouveau_url/0
+    nouveau_url/0,
+    max_sessions/0,
+    max_pipeline_size/0
 ]).
 
 index_name(Path) when is_binary(Path) ->
@@ -196,3 +198,9 @@ get_local_purge_doc_body(LocalDocId, PurgeSeq, Index) ->
 
 nouveau_url() ->
     config:get("nouveau", "url", "http://127.0.0.1:8080";).
+
+max_sessions() ->
+    config:get_integer("nouveau", "max_sessions", 100).
+
+max_pipeline_size() ->
+    config:get_integer("nouveau", "max_pipeline_size", 1000).

Reply via email to