This is an automated email from the ASF dual-hosted git repository.
rnewson pushed a commit to branch nouveau-ibrowse-improvements
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/nouveau-ibrowse-improvements
by this push:
new 75aa61647 reintroduce flow control for async index update
75aa61647 is described below
commit 75aa616473ae6c473f1af9dfbbe7d9b47803fbe2
Author: Robert Newson <[email protected]>
AuthorDate: Thu Sep 7 10:20:55 2023 +0100
reintroduce flow control for async index update
---
src/nouveau/src/nouveau_api.erl | 49 +++++++++++++------------------
src/nouveau/src/nouveau_index_manager.erl | 4 +--
src/nouveau/src/nouveau_index_updater.erl | 19 +++++++-----
src/nouveau/src/nouveau_util.erl | 10 ++++++-
4 files changed, 43 insertions(+), 39 deletions(-)
diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl
index c2811792f..c133d951b 100644
--- a/src/nouveau/src/nouveau_api.erl
+++ b/src/nouveau/src/nouveau_api.erl
@@ -29,7 +29,7 @@
search/2,
set_purge_seq/3,
set_update_seq/3,
- drain_async_responses/1,
+ drain_async_responses/2,
jaxrs_error/2
]).
@@ -201,39 +201,32 @@ set_seq(#index{} = Index, ReqBody) ->
send_error(Reason)
end.
-drain_async_responses(List) when is_list(List) ->
- drain_async_responses_list(List);
-drain_async_responses(Timeout) when is_integer(Timeout); Timeout == infinity ->
- drain_async_responses_timeout(Timeout, []).
-
-drain_async_responses_list([]) ->
- ok;
-drain_async_responses_list([ReqId | Rest]) ->
- receive
- {ibrowse_async_headers, ReqId, Code, Headers} ->
- case drain_async_response(ReqId, Code, Headers, undefined) of
- {ok, "204", _Headers, _Body} ->
- drain_async_responses_list(Rest);
- {ok, StatusCode, _Headers, RespBody} ->
- exit({error, jaxrs_error(StatusCode, RespBody)})
- end
+%% wait for enough async responses to reduce the Queue to Min length.
+drain_async_responses(Queue0, Min) when Min >= 0 ->
+ case queue:len(Queue0) > Min of
+ true ->
+ {{value, ReqId}, Queue1} = queue:out(Queue0),
+ wait_for_response(ReqId),
+ drain_async_responses(Queue1, Min);
+ false ->
+ Queue0
end.
-drain_async_responses_timeout(Timeout, ReqIds) when is_integer(Timeout);
Timeout == infinity ->
- receive
- {ibrowse_async_headers, ReqId, Code0, Headers0} ->
- case drain_async_response(ReqId, Code0, Headers0, undefined) of
- {ok, "204", _Headers, _Body} ->
- drain_async_responses_timeout(Timeout, [ReqId | ReqIds]);
- {ok, StatusCode, _Headers, RespBody} ->
- exit({error, jaxrs_error(StatusCode, RespBody)})
- end
- after Timeout ->
- ReqIds
+wait_for_response(ReqId) ->
+ case drain_async_response(ReqId) of
+ {ok, "204", _Headers, _Body} ->
+ ok;
+ {ok, StatusCode, _Headers, RespBody} ->
+ exit({error, jaxrs_error(StatusCode, RespBody)})
end.
+drain_async_response(ReqId) ->
+ drain_async_response(ReqId, undefined, undefined, undefined).
+
drain_async_response(ReqId, Code0, Headers0, Body0) ->
receive
+ {ibrowse_async_headers, ReqId, Code1, Headers1} ->
+ drain_async_response(ReqId, Code1, Headers1, Body0);
{ibrowse_async_response, ReqId, Body1} ->
drain_async_response(ReqId, Code0, Headers0, Body1);
{ibrowse_async_response_end, ReqId} ->
diff --git a/src/nouveau/src/nouveau_index_manager.erl
b/src/nouveau/src/nouveau_index_manager.erl
index bfbd74990..5afb43fc1 100644
--- a/src/nouveau/src/nouveau_index_manager.erl
+++ b/src/nouveau/src/nouveau_index_manager.erl
@@ -152,10 +152,10 @@ configure_ibrowse(URL) ->
ibrowse:set_max_sessions(
Host,
Port,
- config:get_integer("nouveau", "max_sessions", 100)
+ nouveau_util:max_sessions()
),
ibrowse:set_max_pipeline_size(
Host,
Port,
- config:get_integer("nouveau", "max_pipeline_size", 1000)
+ nouveau_util:max_pipeline_size()
).
diff --git a/src/nouveau/src/nouveau_index_updater.erl
b/src/nouveau/src/nouveau_index_updater.erl
index 65533a90c..dfc6eb122 100644
--- a/src/nouveau/src/nouveau_index_updater.erl
+++ b/src/nouveau/src/nouveau_index_updater.erl
@@ -33,9 +33,10 @@
changes_done,
total_changes,
exclude_idrevs,
- reqids = [],
+ reqids,
conn_pid,
- update_seq
+ update_seq,
+ max_pipeline_size
}).
-record(purge_acc, {
@@ -97,13 +98,15 @@ update(#index{} = Index) ->
changes_done = 0,
total_changes = TotalChanges,
exclude_idrevs = PurgeAcc1#purge_acc.exclude_list,
+ reqids = queue:new(),
conn_pid = ConnPid,
- update_seq = PurgeAcc1#purge_acc.index_update_seq
+ update_seq = PurgeAcc1#purge_acc.index_update_seq,
+ max_pipeline_size = nouveau_util:max_pipeline_size()
},
{ok, Acc1} = couch_db:fold_changes(
Db, Acc0#acc.update_seq, fun load_docs/2, Acc0, []
),
-
nouveau_api:drain_async_responses(lists:reverse(Acc1#acc.reqids)),
+ nouveau_api:drain_async_responses(Acc1#acc.reqids, 0),
ibrowse:stop_worker_process(ConnPid),
ok = nouveau_api:set_update_seq(Index,
Acc1#acc.update_seq, NewCurSeq)
after
@@ -117,9 +120,9 @@ update(#index{} = Index) ->
load_docs(#full_doc_info{id = <<"_design/", _/binary>>}, #acc{} = Acc) ->
{ok, Acc};
load_docs(FDI, #acc{} = Acc0) ->
- %% collect completed requests without blocking
- ReqIds = nouveau_api:drain_async_responses(0),
- Acc1 = Acc0#acc{reqids = Acc0#acc.reqids -- ReqIds},
+ %% block for responses so we stay under the max pipeline size
+ ReqIds1 = nouveau_api:drain_async_responses(Acc0#acc.reqids,
Acc0#acc.max_pipeline_size),
+ Acc1 = Acc0#acc{reqids = ReqIds1},
couch_task_status:update([
{changes_done, Acc1#acc.changes_done},
@@ -146,7 +149,7 @@ load_docs(FDI, #acc{} = Acc0) ->
{ibrowse_req_id, ReqId} ->
Acc1#acc{
update_seq = DI#doc_info.high_seq,
- reqids = [ReqId | Acc1#acc.reqids]
+ reqids = queue:in(ReqId, Acc1#acc.reqids)
};
{error, Reason} ->
exit({error, Reason})
diff --git a/src/nouveau/src/nouveau_util.erl b/src/nouveau/src/nouveau_util.erl
index 86fc3a4d8..57aaf12aa 100644
--- a/src/nouveau/src/nouveau_util.erl
+++ b/src/nouveau/src/nouveau_util.erl
@@ -27,7 +27,9 @@
maybe_create_local_purge_doc/2,
get_local_purge_doc_id/1,
get_local_purge_doc_body/3,
- nouveau_url/0
+ nouveau_url/0,
+ max_sessions/0,
+ max_pipeline_size/0
]).
index_name(Path) when is_binary(Path) ->
@@ -196,3 +198,9 @@ get_local_purge_doc_body(LocalDocId, PurgeSeq, Index) ->
nouveau_url() ->
config:get("nouveau", "url", "http://127.0.0.1:8080").
+
+max_sessions() ->
+ config:get_integer("nouveau", "max_sessions", 100).
+
+max_pipeline_size() ->
+ config:get_integer("nouveau", "max_pipeline_size", 1000).