This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch nouveau-ibrowse-improvements in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 35b32dbbb1f73dcfe8efd7bb033ed268f0aea064 Author: Robert Newson <[email protected]> AuthorDate: Mon Sep 4 18:53:43 2023 +0100 Async dispatch to dedicated ibrowse process for performance --- src/nouveau/src/nouveau_api.erl | 104 +++++++++++++++++++++++------- src/nouveau/src/nouveau_index_updater.erl | 73 ++++++++++++++------- 2 files changed, 130 insertions(+), 47 deletions(-) diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl index 39cc2b32f..c5d147fb2 100644 --- a/src/nouveau/src/nouveau_api.erl +++ b/src/nouveau/src/nouveau_api.erl @@ -23,12 +23,14 @@ create_index/2, delete_path/1, delete_path/2, - delete_doc/3, + delete_doc_async/4, purge_doc/3, - update_doc/5, + update_doc_async/6, search/2, set_purge_seq/2, - set_update_seq/2 + set_update_seq/2, + drain_async_responses/1, + jaxrs_error/2 ]). -define(JSON_CONTENT_TYPE, {"Content-Type", "application/json"}). @@ -97,20 +99,25 @@ delete_path(Path, Exclusions) when send_error(Reason) end. -delete_doc(#index{} = Index, DocId, UpdateSeq) when - is_binary(DocId), is_integer(UpdateSeq) +delete_doc_async(ConnPid, #index{} = Index, DocId, UpdateSeq) when + is_pid(ConnPid), is_binary(DocId), is_integer(UpdateSeq) -> - delete_doc(Index, DocId, UpdateSeq, false). + ReqBody = #{seq => UpdateSeq, purge => false}, + send_direct_if_enabled( + ConnPid, + doc_url(Index, DocId), + [?JSON_CONTENT_TYPE], + delete, + jiffy:encode(ReqBody), + [ + {stream_to, self()} + ] + ). purge_doc(#index{} = Index, DocId, PurgeSeq) when is_binary(DocId), is_integer(PurgeSeq) -> - delete_doc(Index, DocId, PurgeSeq, true). - -delete_doc(#index{} = Index, DocId, Seq, IsPurge) when - is_binary(DocId), is_integer(Seq), is_boolean(IsPurge) --> - ReqBody = #{seq => Seq, purge => IsPurge}, + ReqBody = #{seq => PurgeSeq, purge => true}, Resp = send_if_enabled( doc_url(Index, DocId), [?JSON_CONTENT_TYPE], delete, jiffy:encode(ReqBody) ), @@ -123,7 +130,8 @@ delete_doc(#index{} = Index, DocId, Seq, IsPurge) when send_error(Reason) end. -update_doc(#index{} = Index, DocId, UpdateSeq, Partition, Fields) when +update_doc_async(ConnPid, #index{} = Index, DocId, UpdateSeq, Partition, Fields) when + is_pid(ConnPid), is_binary(DocId), is_integer(UpdateSeq), (is_binary(Partition) orelse Partition == null), @@ -134,17 +142,16 @@ update_doc(#index{} = Index, DocId, UpdateSeq, Partition, Fields) when partition => Partition, fields => Fields }, - Resp = send_if_enabled( - doc_url(Index, DocId), [?JSON_CONTENT_TYPE], put, jiffy:encode(ReqBody) - ), - case Resp of - {ok, "204", _, _} -> - ok; - {ok, StatusCode, _, RespBody} -> - {error, jaxrs_error(StatusCode, RespBody)}; - {error, Reason} -> - send_error(Reason) - end. + send_direct_if_enabled( + ConnPid, + doc_url(Index, DocId), + [?JSON_CONTENT_TYPE], + put, + jiffy:encode(ReqBody), + [ + {stream_to, self()} + ] + ). search(#index{} = Index, QueryArgs) -> Resp = send_if_enabled( @@ -180,6 +187,42 @@ set_seq(#index{} = Index, Key, Value) when is_atom(Key), is_integer(Value) -> send_error(Reason) end. +drain_async_responses(List) when is_list(List) -> + drain_async_responses_list(List); +drain_async_responses(Timeout) when is_integer(Timeout); Timeout == infinity -> + drain_async_responses_timeout(Timeout, []). + +drain_async_responses_list([]) -> + ok; +drain_async_responses_list([ReqId | Rest]) -> + receive + {ibrowse_async_headers, ReqId, Code, Headers} -> + drain_async_response(ReqId, Code, Headers, undefined) + end, + drain_async_responses_list(Rest). + + +drain_async_responses_timeout(Timeout, ReqIds) when is_integer(Timeout); Timeout == infinity -> + receive + {ibrowse_async_headers, ReqId, Code0, Headers0} -> + case drain_async_response(ReqId, Code0, Headers0, undefined) of + {ok, "204", _Headers, _Body} -> + drain_async_responses_timeout(Timeout, [ReqId | ReqIds]); + {ok, StatusCode, _Headers, RespBody} -> + exit({error, jaxrs_error(StatusCode, RespBody)}) + end + after Timeout -> + ReqIds + end. + +drain_async_response(ReqId, Code0, Headers0, Body0) -> + receive + {ibrowse_async_response, ReqId, Body1} -> + drain_async_response(ReqId, Code0, Headers0, Body1); + {ibrowse_async_response_end, ReqId} -> + {ok, Code0, Headers0, Body0} + end. + %% private functions index_path(Path) -> @@ -249,9 +292,20 @@ send_if_enabled(Url, Header, Method) -> send_if_enabled(Url, Header, Method, []). send_if_enabled(Url, Header, Method, Body) -> + send_if_enabled(Url, Header, Method, Body, []). + +send_if_enabled(Url, Header, Method, Body, Options) -> + case nouveau:enabled() of + true -> + ibrowse:send_req(Url, Header, Method, Body, Options); + false -> + {error, nouveau_not_enabled} + end. + +send_direct_if_enabled(ConnPid, Url, Header, Method, Body, Options) -> case nouveau:enabled() of true -> - ibrowse:send_req(Url, Header, Method, Body); + ibrowse:send_req_direct(ConnPid, Url, Header, Method, Body, Options); false -> {error, nouveau_not_enabled} end. diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl index d6ca12635..d8d75364c 100644 --- a/src/nouveau/src/nouveau_index_updater.erl +++ b/src/nouveau/src/nouveau_index_updater.erl @@ -32,7 +32,9 @@ proc, changes_done, total_changes, - exclude_idrevs + exclude_idrevs, + reqids = [], + conn_pid }). outdated(#index{} = Index) -> @@ -69,14 +71,28 @@ update(#index{} = Index) -> %% update status every half second couch_task_status:set_update_frequency(500), - {ok, ExcludeIdRevs} = purge_index(Db, Index, IndexPurgeSeq), + {ok, ConnPid} = ibrowse:spawn_link_worker_process(nouveau_util:nouveau_url()), + {ok, ExcludeIdRevs} = purge_index(ConnPid, Db, Index, IndexPurgeSeq), NewCurSeq = couch_db:get_update_seq(Db), Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), - Acc0 = #acc{db = Db, index=Index, proc=Proc, changes_done=0, total_changes=TotalChanges, exclude_idrevs=ExcludeIdRevs}, - {ok, _} = couch_db:fold_changes(Db, IndexUpdateSeq, fun load_docs/2, Acc0, []), + + Acc0 = #acc{ + db = Db, + index = Index, + proc = Proc, + changes_done = 0, + total_changes = TotalChanges, + exclude_idrevs = ExcludeIdRevs, + conn_pid = ConnPid + }, + {ok, Acc1} = couch_db:fold_changes( + Db, IndexUpdateSeq, fun load_docs/2, Acc0, [] + ), + nouveau_api:drain_async_responses(lists:reverse(Acc1#acc.reqids)), + ibrowse:stop_worker_process(ConnPid), ok = nouveau_api:set_update_seq(Index, NewCurSeq) after ret_os_process(Proc) @@ -88,23 +104,41 @@ update(#index{} = Index) -> load_docs(#full_doc_info{id = <<"_design/", _/binary>>}, #acc{} = Acc) -> {ok, Acc}; -load_docs(FDI, #acc{} = Acc) -> +load_docs(FDI, #acc{} = Acc0) -> + %% collect completed requests without blocking + ReqIds = nouveau_api:drain_async_responses(0), + Acc1 = Acc0#acc{reqids = Acc0#acc.reqids -- ReqIds}, + couch_task_status:update([ - {changes_done, Acc#acc.changes_done}, {progress, (Acc#acc.changes_done * 100) div Acc#acc.total_changes} + {changes_done, Acc1#acc.changes_done}, + {progress, (Acc1#acc.changes_done * 100) div Acc1#acc.total_changes} ]), DI = couch_doc:to_doc_info(FDI), #doc_info{id = Id, revs = [#rev_info{rev = Rev} | _]} = DI, - case lists:member({Id, Rev}, Acc#acc.exclude_idrevs) of - true -> ok; - false -> update_or_delete_index(Acc#acc.db, Acc#acc.index, DI, Acc#acc.proc) - end, - {ok, Acc#acc{changes_done = Acc#acc.changes_done + 1}}. -update_or_delete_index(Db, #index{} = Index, #doc_info{} = DI, Proc) -> + Acc2 = + case lists:member({Id, Rev}, Acc1#acc.exclude_idrevs) of + true -> + Acc1; + false -> + case + update_or_delete_index( + Acc1#acc.conn_pid, Acc1#acc.db, Acc1#acc.index, DI, Acc1#acc.proc + ) + of + {ibrowse_req_id, ReqId} -> + Acc1#acc{reqids = [ReqId | Acc1#acc.reqids]}; + {error, Reason} -> + exit({error, Reason}) + end + end, + {ok, Acc2#acc{changes_done = Acc2#acc.changes_done + 1}}. + +update_or_delete_index(ConnPid, Db, #index{} = Index, #doc_info{} = DI, Proc) -> #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI, case Del of true -> - ok = nouveau_api:delete_doc(Index, Id, Seq); + nouveau_api:delete_doc_async(ConnPid, Index, Id, Seq); false -> {ok, Doc} = couch_db:open_doc(Db, DI, []), Json = couch_doc:to_json_obj(Doc, []), @@ -118,14 +152,9 @@ update_or_delete_index(Db, #index{} = Index, #doc_info{} = DI, Proc) -> end, case Fields of [] -> - ok = nouveau_api:delete_doc(Index, Id, Seq); + nouveau_api:delete_doc_async(ConnPid, Index, Id, Seq); _ -> - case nouveau_api:update_doc(Index, Id, Seq, Partition, Fields) of - ok -> - ok; - {error, Reason} -> - exit({error, Reason}) - end + nouveau_api:update_doc_async(ConnPid, Index, Id, Seq, Partition, Fields) end end. @@ -169,7 +198,7 @@ index_definition(#index{} = Index) -> <<"field_analyzers">> => Index#index.field_analyzers }. -purge_index(Db, Index, IndexPurgeSeq) -> +purge_index(ConnPid, Db, Index, IndexPurgeSeq) -> Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), @@ -186,7 +215,7 @@ purge_index(Db, Index, IndexPurgeSeq) -> true -> Acc; false -> - update_or_delete_index(Db, Index, DI, Proc), + update_or_delete_index(ConnPid, Db, Index, DI, Proc), [{Id, Rev} | Acc] end end,
