This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch nouveau-ibrowse-improvements in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 6898d6e482ef219f7ec21d802571ddefb99641f1 Author: Robert Newson <[email protected]> AuthorDate: Thu Sep 7 10:20:55 2023 +0100 reintroduce flow control for async index update --- src/nouveau/src/nouveau_api.erl | 49 +++++++++++++------------------ src/nouveau/src/nouveau_index_manager.erl | 4 +-- src/nouveau/src/nouveau_index_updater.erl | 19 +++++++----- src/nouveau/src/nouveau_util.erl | 10 ++++++- 4 files changed, 43 insertions(+), 39 deletions(-) diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl index c2811792f..c133d951b 100644 --- a/src/nouveau/src/nouveau_api.erl +++ b/src/nouveau/src/nouveau_api.erl @@ -29,7 +29,7 @@ search/2, set_purge_seq/3, set_update_seq/3, - drain_async_responses/1, + drain_async_responses/2, jaxrs_error/2 ]). @@ -201,39 +201,32 @@ set_seq(#index{} = Index, ReqBody) -> send_error(Reason) end. -drain_async_responses(List) when is_list(List) -> - drain_async_responses_list(List); -drain_async_responses(Timeout) when is_integer(Timeout); Timeout == infinity -> - drain_async_responses_timeout(Timeout, []). - -drain_async_responses_list([]) -> - ok; -drain_async_responses_list([ReqId | Rest]) -> - receive - {ibrowse_async_headers, ReqId, Code, Headers} -> - case drain_async_response(ReqId, Code, Headers, undefined) of - {ok, "204", _Headers, _Body} -> - drain_async_responses_list(Rest); - {ok, StatusCode, _Headers, RespBody} -> - exit({error, jaxrs_error(StatusCode, RespBody)}) - end +%% wait for enough async responses to reduce the Queue to Min length. +drain_async_responses(Queue0, Min) when Min >= 0 -> + case queue:len(Queue0) > Min of + true -> + {{value, ReqId}, Queue1} = queue:out(Queue0), + wait_for_response(ReqId), + drain_async_responses(Queue1, Min); + false -> + Queue0 end. -drain_async_responses_timeout(Timeout, ReqIds) when is_integer(Timeout); Timeout == infinity -> - receive - {ibrowse_async_headers, ReqId, Code0, Headers0} -> - case drain_async_response(ReqId, Code0, Headers0, undefined) of - {ok, "204", _Headers, _Body} -> - drain_async_responses_timeout(Timeout, [ReqId | ReqIds]); - {ok, StatusCode, _Headers, RespBody} -> - exit({error, jaxrs_error(StatusCode, RespBody)}) - end - after Timeout -> - ReqIds +wait_for_response(ReqId) -> + case drain_async_response(ReqId) of + {ok, "204", _Headers, _Body} -> + ok; + {ok, StatusCode, _Headers, RespBody} -> + exit({error, jaxrs_error(StatusCode, RespBody)}) end. +drain_async_response(ReqId) -> + drain_async_response(ReqId, undefined, undefined, undefined). + drain_async_response(ReqId, Code0, Headers0, Body0) -> receive + {ibrowse_async_headers, ReqId, Code1, Headers1} -> + drain_async_response(ReqId, Code1, Headers1, Body0); {ibrowse_async_response, ReqId, Body1} -> drain_async_response(ReqId, Code0, Headers0, Body1); {ibrowse_async_response_end, ReqId} -> diff --git a/src/nouveau/src/nouveau_index_manager.erl b/src/nouveau/src/nouveau_index_manager.erl index bfbd74990..5afb43fc1 100644 --- a/src/nouveau/src/nouveau_index_manager.erl +++ b/src/nouveau/src/nouveau_index_manager.erl @@ -152,10 +152,10 @@ configure_ibrowse(URL) -> ibrowse:set_max_sessions( Host, Port, - config:get_integer("nouveau", "max_sessions", 100) + nouveau_util:max_sessions() ), ibrowse:set_max_pipeline_size( Host, Port, - config:get_integer("nouveau", "max_pipeline_size", 1000) + nouveau_util:max_pipeline_size() ). diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl index 65533a90c..dfc6eb122 100644 --- a/src/nouveau/src/nouveau_index_updater.erl +++ b/src/nouveau/src/nouveau_index_updater.erl @@ -33,9 +33,10 @@ changes_done, total_changes, exclude_idrevs, - reqids = [], + reqids, conn_pid, - update_seq + update_seq, + max_pipeline_size }). -record(purge_acc, { @@ -97,13 +98,15 @@ update(#index{} = Index) -> changes_done = 0, total_changes = TotalChanges, exclude_idrevs = PurgeAcc1#purge_acc.exclude_list, + reqids = queue:new(), conn_pid = ConnPid, - update_seq = PurgeAcc1#purge_acc.index_update_seq + update_seq = PurgeAcc1#purge_acc.index_update_seq, + max_pipeline_size = nouveau_util:max_pipeline_size() }, {ok, Acc1} = couch_db:fold_changes( Db, Acc0#acc.update_seq, fun load_docs/2, Acc0, [] ), - nouveau_api:drain_async_responses(lists:reverse(Acc1#acc.reqids)), + nouveau_api:drain_async_responses(Acc1#acc.reqids, 0), ibrowse:stop_worker_process(ConnPid), ok = nouveau_api:set_update_seq(Index, Acc1#acc.update_seq, NewCurSeq) after @@ -117,9 +120,9 @@ update(#index{} = Index) -> load_docs(#full_doc_info{id = <<"_design/", _/binary>>}, #acc{} = Acc) -> {ok, Acc}; load_docs(FDI, #acc{} = Acc0) -> - %% collect completed requests without blocking - ReqIds = nouveau_api:drain_async_responses(0), - Acc1 = Acc0#acc{reqids = Acc0#acc.reqids -- ReqIds}, + %% block for responses so we stay under the max pipeline size + ReqIds1 = nouveau_api:drain_async_responses(Acc0#acc.reqids, Acc0#acc.max_pipeline_size), + Acc1 = Acc0#acc{reqids = ReqIds1}, couch_task_status:update([ {changes_done, Acc1#acc.changes_done}, @@ -146,7 +149,7 @@ load_docs(FDI, #acc{} = Acc0) -> {ibrowse_req_id, ReqId} -> Acc1#acc{ update_seq = DI#doc_info.high_seq, - reqids = [ReqId | Acc1#acc.reqids] + reqids = queue:in(ReqId, Acc1#acc.reqids) }; {error, Reason} -> exit({error, Reason}) diff --git a/src/nouveau/src/nouveau_util.erl b/src/nouveau/src/nouveau_util.erl index 86fc3a4d8..57aaf12aa 100644 --- a/src/nouveau/src/nouveau_util.erl +++ b/src/nouveau/src/nouveau_util.erl @@ -27,7 +27,9 @@ maybe_create_local_purge_doc/2, get_local_purge_doc_id/1, get_local_purge_doc_body/3, - nouveau_url/0 + nouveau_url/0, + max_sessions/0, + max_pipeline_size/0 ]). index_name(Path) when is_binary(Path) -> @@ -196,3 +198,9 @@ get_local_purge_doc_body(LocalDocId, PurgeSeq, Index) -> nouveau_url() -> config:get("nouveau", "url", "http://127.0.0.1:8080"). + +max_sessions() -> + config:get_integer("nouveau", "max_sessions", 100). + +max_pipeline_size() -> + config:get_integer("nouveau", "max_pipeline_size", 1000).
