This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch nouveau-ibrowse-improvements in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 34a6515a3a6270ed1a76c107406dc9f032e4a5cd Author: Robert Newson <[email protected]> AuthorDate: Sat Sep 2 15:30:14 2023 +0100 explicitly make our own connection when updating --- src/nouveau/src/nouveau_api.erl | 56 +++++++++++++------------------ src/nouveau/src/nouveau_index_updater.erl | 33 +++++++++++------- 2 files changed, 44 insertions(+), 45 deletions(-) diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl index c039a44fd..a568f8751 100644 --- a/src/nouveau/src/nouveau_api.erl +++ b/src/nouveau/src/nouveau_api.erl @@ -23,16 +23,15 @@ create_index/2, delete_path/1, delete_path/2, - delete_doc/4, - purge_doc/4, - update_doc/6, + delete_doc/5, + purge_doc/5, + update_doc/7, search/2, set_purge_seq/3, set_update_seq/3 ]). -define(JSON_CONTENT_TYPE, {"Content-Type", "application/json"}). --define(NOUVEAU_CONN_PID, nouveau_conn_pid). analyze(Text, Analyzer) when is_binary(Text), is_binary(Analyzer) @@ -98,21 +97,22 @@ delete_path(Path, Exclusions) when send_error(Reason) end. -delete_doc(#index{} = Index, DocId, FromSeq, ToSeq) when - is_binary(DocId), is_integer(FromSeq), is_integer(ToSeq) +delete_doc(ConnPid, #index{} = Index, DocId, FromSeq, ToSeq) when + is_pid(ConnPid), is_binary(DocId), is_integer(FromSeq), is_integer(ToSeq) -> - delete_doc(Index, DocId, FromSeq, ToSeq, false). + delete_doc(ConnPid, Index, DocId, FromSeq, ToSeq, false). -purge_doc(#index{} = Index, DocId, FromSeq, ToSeq) when - is_binary(DocId), is_integer(FromSeq), is_integer(ToSeq) +purge_doc(ConnPid, #index{} = Index, DocId, FromSeq, ToSeq) when + is_pid(ConnPid), is_binary(DocId), is_integer(FromSeq), is_integer(ToSeq) -> - delete_doc(Index, DocId, FromSeq, ToSeq, true). + delete_doc(ConnPid, Index, DocId, FromSeq, ToSeq, true). -delete_doc(#index{} = Index, DocId, FromSeq, ToSeq, IsPurge) when - is_binary(DocId), is_integer(FromSeq), is_integer(ToSeq), is_boolean(IsPurge) +delete_doc(ConnPid, #index{} = Index, DocId, FromSeq, ToSeq, IsPurge) when + is_pid(ConnPid), is_binary(DocId), is_integer(FromSeq), is_integer(ToSeq), is_boolean(IsPurge) -> ReqBody = #{from_seq => FromSeq, to_seq => ToSeq, purge => IsPurge}, - Resp = send_if_enabled( + Resp = send_direct_if_enabled( + ConnPid, doc_url(Index, DocId), [?JSON_CONTENT_TYPE], delete, @@ -128,7 +128,8 @@ delete_doc(#index{} = Index, DocId, FromSeq, ToSeq, IsPurge) when send_error(Reason) end. -update_doc(#index{} = Index, DocId, FromSeq, ToSeq, Partition, Fields) when +update_doc(ConnPid, #index{} = Index, DocId, FromSeq, ToSeq, Partition, Fields) when + is_pid(ConnPid), is_binary(DocId), is_integer(FromSeq), is_integer(ToSeq), @@ -141,7 +142,8 @@ update_doc(#index{} = Index, DocId, FromSeq, ToSeq, Partition, Fields) when partition => Partition, fields => Fields }, - Resp = send_if_enabled( + Resp = send_direct_if_enabled( + ConnPid, doc_url(Index, DocId), [?JSON_CONTENT_TYPE], put, @@ -272,24 +274,14 @@ send_if_enabled(Url, Headers, Method, Body) -> send_if_enabled(Url, Headers, Method, Body, Options) -> case nouveau:enabled() of true -> - ibrowse:send_req_direct(conn_pid(), Url, Headers, Method, Body, Options); + ibrowse:send_req(Url, Headers, Method, Body, Options); false -> {error, nouveau_not_enabled} end. - -conn_pid() -> - case erlang:get(?NOUVEAU_CONN_PID) of - undefined -> - Url = nouveau_util:nouveau_url(), - {ok, Pid} = ibrowse:spawn_link_worker_process(Url), - erlang:put(?NOUVEAU_CONN_PID, Pid), - Pid; - Pid when is_pid(Pid) -> - case is_process_alive(Pid) of - true -> - Pid; - false -> - erlang:erase(?NOUVEAU_CONN_PID), - conn_pid() - end +send_direct_if_enabled(ConnPid, Url, Headers, Method, Body, Options) -> + case nouveau:enabled() of + true -> + ibrowse:send_req_direct(ConnPid, Url, Headers, Method, Body, Options); + false -> + {error, nouveau_not_enabled} end. diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl index a53db9d7d..a518dc2c8 100644 --- a/src/nouveau/src/nouveau_index_updater.erl +++ b/src/nouveau/src/nouveau_index_updater.erl @@ -34,7 +34,8 @@ total_changes, exclude_idrevs, req_ids, - from_seq + from_seq, + conn_pid }). outdated(#index{} = Index) -> @@ -71,7 +72,8 @@ update(#index{} = Index) -> %% update status every half second couch_task_status:set_update_frequency(500), - {ok, ExcludeIdRevs} = purge_index(Db, Index, IndexPurgeSeq), + {ok, ConnPid} = ibrowse:spawn_link_worker_process(nouveau_util:nouveau_url()), + {ok, ExcludeIdRevs} = purge_index(ConnPid, Db, Index, IndexPurgeSeq), NewCurSeq = couch_db:get_update_seq(Db), Proc = get_os_process(Index#index.def_lang), @@ -85,7 +87,8 @@ update(#index{} = Index) -> total_changes = TotalChanges, exclude_idrevs = ExcludeIdRevs, req_ids = [], - from_seq = IndexUpdateSeq + from_seq = IndexUpdateSeq, + conn_pid = ConnPid }, {ok, #acc{from_seq = FromSeq, req_ids = ReqIds}} = couch_db:fold_changes(Db, IndexUpdateSeq, fun load_docs/2, Acc0, []), @@ -111,7 +114,8 @@ load_docs( total_changes = TotalChanges, exclude_idrevs = ExcludeIdRevs, req_ids = ReqIds, - from_seq = FromSeq + from_seq = FromSeq, + conn_pid = ConnPid } = Acc ) when length(ReqIds) < 100 @@ -125,7 +129,7 @@ load_docs( true -> {ok, Acc#acc{changes_done = ChangesDone + 1}}; false -> - {ReqId, NewFromSeq} = update_or_delete_index(Db, Index, FromSeq, DI, Proc), + {ReqId, NewFromSeq} = update_or_delete_index(ConnPid, Db, Index, FromSeq, DI, Proc), {ok, Acc#acc{ changes_done = ChangesDone + 1, from_seq = NewFromSeq, req_ids = [ReqId | ReqIds] }} @@ -140,7 +144,7 @@ flush_reqids([]) -> []; flush_reqids(ReqIds) -> receive - {ibrowse_async_headers, ReqId, Code, _Headers} when Code == "200"; Code == "201" -> + {ibrowse_async_headers, ReqId, Code, _Headers} when Code == "204" -> ok = flush_reqid(ReqId), flush_reqids(lists:delete(ReqId, ReqIds)); {ibrowse_async_headers, _ReqId, Code, _Headers} -> @@ -155,11 +159,11 @@ flush_reqid(ReqId) -> ok end. -update_or_delete_index(Db, #index{} = Index, IfMatchSeq, #doc_info{} = DI, Proc) -> +update_or_delete_index(ConnPid, Db, #index{} = Index, FromSeq, #doc_info{} = DI, Proc) -> #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI, case Del of true -> - case nouveau_api:delete_doc(Index, Id, IfMatchSeq, Seq) of + case nouveau_api:delete_doc(ConnPid, Index, Id, FromSeq, Seq) of {ok, ReqId} -> {ReqId, Seq}; {error, Reason} -> @@ -178,14 +182,18 @@ update_or_delete_index(Db, #index{} = Index, IfMatchSeq, #doc_info{} = DI, Proc) end, case Fields of [] -> - case nouveau_api:delete_doc(Index, Id, IfMatchSeq, Seq) of + case nouveau_api:delete_doc(ConnPid, Index, Id, FromSeq, Seq) of {ok, ReqId} -> {ReqId, Seq}; {error, Reason} -> exit({error, Reason}) end; _ -> - case nouveau_api:update_doc(Index, Id, IfMatchSeq, Seq, Partition, Fields) of + case + nouveau_api:update_doc( + ConnPid, Index, Id, FromSeq, Seq, Partition, Fields + ) + of {ok, ReqId} -> {ReqId, Seq}; {error, Reason} -> @@ -234,7 +242,7 @@ index_definition(#index{} = Index) -> <<"field_analyzers">> => Index#index.field_analyzers }. -purge_index(Db, Index, IndexPurgeSeq) -> +purge_index(ConnPid, Db, Index, IndexPurgeSeq) -> Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), @@ -251,8 +259,7 @@ purge_index(Db, Index, IndexPurgeSeq) -> true -> Acc; false -> - %% TODO - update_or_delete_index(Db, Index, 0, DI, Proc), + update_or_delete_index(ConnPid, Db, Index, PurgeSeq, DI, Proc), [{Id, Rev} | Acc] end end,
