This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch nouveau-ibrowse-improvements
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 34a6515a3a6270ed1a76c107406dc9f032e4a5cd
Author: Robert Newson <[email protected]>
AuthorDate: Sat Sep 2 15:30:14 2023 +0100

    explicitly make our own connection when updating
---
 src/nouveau/src/nouveau_api.erl           | 56 +++++++++++++------------------
 src/nouveau/src/nouveau_index_updater.erl | 33 +++++++++++-------
 2 files changed, 44 insertions(+), 45 deletions(-)

diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl
index c039a44fd..a568f8751 100644
--- a/src/nouveau/src/nouveau_api.erl
+++ b/src/nouveau/src/nouveau_api.erl
@@ -23,16 +23,15 @@
     create_index/2,
     delete_path/1,
     delete_path/2,
-    delete_doc/4,
-    purge_doc/4,
-    update_doc/6,
+    delete_doc/5,
+    purge_doc/5,
+    update_doc/7,
     search/2,
     set_purge_seq/3,
     set_update_seq/3
 ]).
 
 -define(JSON_CONTENT_TYPE, {"Content-Type", "application/json"}).
--define(NOUVEAU_CONN_PID, nouveau_conn_pid).
 
 analyze(Text, Analyzer) when
     is_binary(Text), is_binary(Analyzer)
@@ -98,21 +97,22 @@ delete_path(Path, Exclusions) when
             send_error(Reason)
     end.
 
-delete_doc(#index{} = Index, DocId, FromSeq, ToSeq) when
-    is_binary(DocId), is_integer(FromSeq), is_integer(ToSeq)
+delete_doc(ConnPid, #index{} = Index, DocId, FromSeq, ToSeq) when
+    is_pid(ConnPid), is_binary(DocId), is_integer(FromSeq), is_integer(ToSeq)
 ->
-    delete_doc(Index, DocId, FromSeq, ToSeq, false).
+    delete_doc(ConnPid, Index, DocId, FromSeq, ToSeq, false).
 
-purge_doc(#index{} = Index, DocId, FromSeq, ToSeq) when
-    is_binary(DocId), is_integer(FromSeq), is_integer(ToSeq)
+purge_doc(ConnPid, #index{} = Index, DocId, FromSeq, ToSeq) when
+    is_pid(ConnPid), is_binary(DocId), is_integer(FromSeq), is_integer(ToSeq)
 ->
-    delete_doc(Index, DocId, FromSeq, ToSeq, true).
+    delete_doc(ConnPid, Index, DocId, FromSeq, ToSeq, true).
 
-delete_doc(#index{} = Index, DocId, FromSeq, ToSeq, IsPurge) when
-    is_binary(DocId), is_integer(FromSeq), is_integer(ToSeq), 
is_boolean(IsPurge)
+delete_doc(ConnPid, #index{} = Index, DocId, FromSeq, ToSeq, IsPurge) when
+    is_pid(ConnPid), is_binary(DocId), is_integer(FromSeq), is_integer(ToSeq), 
is_boolean(IsPurge)
 ->
     ReqBody = #{from_seq => FromSeq, to_seq => ToSeq, purge => IsPurge},
-    Resp = send_if_enabled(
+    Resp = send_direct_if_enabled(
+        ConnPid,
         doc_url(Index, DocId),
         [?JSON_CONTENT_TYPE],
         delete,
@@ -128,7 +128,8 @@ delete_doc(#index{} = Index, DocId, FromSeq, ToSeq, 
IsPurge) when
             send_error(Reason)
     end.
 
-update_doc(#index{} = Index, DocId, FromSeq, ToSeq, Partition, Fields) when
+update_doc(ConnPid, #index{} = Index, DocId, FromSeq, ToSeq, Partition, 
Fields) when
+    is_pid(ConnPid),
     is_binary(DocId),
     is_integer(FromSeq),
     is_integer(ToSeq),
@@ -141,7 +142,8 @@ update_doc(#index{} = Index, DocId, FromSeq, ToSeq, 
Partition, Fields) when
         partition => Partition,
         fields => Fields
     },
-    Resp = send_if_enabled(
+    Resp = send_direct_if_enabled(
+        ConnPid,
         doc_url(Index, DocId),
         [?JSON_CONTENT_TYPE],
         put,
@@ -272,24 +274,14 @@ send_if_enabled(Url, Headers, Method, Body) ->
 send_if_enabled(Url, Headers, Method, Body, Options) ->
     case nouveau:enabled() of
         true ->
-            ibrowse:send_req_direct(conn_pid(), Url, Headers, Method, Body, 
Options);
+            ibrowse:send_req(Url, Headers, Method, Body, Options);
         false ->
             {error, nouveau_not_enabled}
     end.
-
-conn_pid() ->
-    case erlang:get(?NOUVEAU_CONN_PID) of
-        undefined ->
-            Url = nouveau_util:nouveau_url(),
-            {ok, Pid} = ibrowse:spawn_link_worker_process(Url),
-            erlang:put(?NOUVEAU_CONN_PID, Pid),
-            Pid;
-        Pid when is_pid(Pid) ->
-            case is_process_alive(Pid) of
-                true ->
-                    Pid;
-                false ->
-                    erlang:erase(?NOUVEAU_CONN_PID),
-                    conn_pid()
-            end
+send_direct_if_enabled(ConnPid, Url, Headers, Method, Body, Options) ->
+    case nouveau:enabled() of
+        true ->
+            ibrowse:send_req_direct(ConnPid, Url, Headers, Method, Body, 
Options);
+        false ->
+            {error, nouveau_not_enabled}
     end.
diff --git a/src/nouveau/src/nouveau_index_updater.erl 
b/src/nouveau/src/nouveau_index_updater.erl
index a53db9d7d..a518dc2c8 100644
--- a/src/nouveau/src/nouveau_index_updater.erl
+++ b/src/nouveau/src/nouveau_index_updater.erl
@@ -34,7 +34,8 @@
     total_changes,
     exclude_idrevs,
     req_ids,
-    from_seq
+    from_seq,
+    conn_pid
 }).
 
 outdated(#index{} = Index) ->
@@ -71,7 +72,8 @@ update(#index{} = Index) ->
                 %% update status every half second
                 couch_task_status:set_update_frequency(500),
 
-                {ok, ExcludeIdRevs} = purge_index(Db, Index, IndexPurgeSeq),
+                {ok, ConnPid} = 
ibrowse:spawn_link_worker_process(nouveau_util:nouveau_url()),
+                {ok, ExcludeIdRevs} = purge_index(ConnPid, Db, Index, 
IndexPurgeSeq),
 
                 NewCurSeq = couch_db:get_update_seq(Db),
                 Proc = get_os_process(Index#index.def_lang),
@@ -85,7 +87,8 @@ update(#index{} = Index) ->
                         total_changes = TotalChanges,
                         exclude_idrevs = ExcludeIdRevs,
                         req_ids = [],
-                        from_seq = IndexUpdateSeq
+                        from_seq = IndexUpdateSeq,
+                        conn_pid = ConnPid
                     },
                     {ok, #acc{from_seq = FromSeq, req_ids = ReqIds}} =
                         couch_db:fold_changes(Db, IndexUpdateSeq, fun 
load_docs/2, Acc0, []),
@@ -111,7 +114,8 @@ load_docs(
         total_changes = TotalChanges,
         exclude_idrevs = ExcludeIdRevs,
         req_ids = ReqIds,
-        from_seq = FromSeq
+        from_seq = FromSeq,
+        conn_pid = ConnPid
     } = Acc
 ) when
     length(ReqIds) < 100
@@ -125,7 +129,7 @@ load_docs(
         true ->
             {ok, Acc#acc{changes_done = ChangesDone + 1}};
         false ->
-            {ReqId, NewFromSeq} = update_or_delete_index(Db, Index, FromSeq, 
DI, Proc),
+            {ReqId, NewFromSeq} = update_or_delete_index(ConnPid, Db, Index, 
FromSeq, DI, Proc),
             {ok, Acc#acc{
                 changes_done = ChangesDone + 1, from_seq = NewFromSeq, req_ids 
= [ReqId | ReqIds]
             }}
@@ -140,7 +144,7 @@ flush_reqids([]) ->
     [];
 flush_reqids(ReqIds) ->
     receive
-        {ibrowse_async_headers, ReqId, Code, _Headers} when Code == "200"; 
Code == "201" ->
+        {ibrowse_async_headers, ReqId, Code, _Headers} when Code == "204" ->
             ok = flush_reqid(ReqId),
             flush_reqids(lists:delete(ReqId, ReqIds));
         {ibrowse_async_headers, _ReqId, Code, _Headers} ->
@@ -155,11 +159,11 @@ flush_reqid(ReqId) ->
             ok
     end.
 
-update_or_delete_index(Db, #index{} = Index, IfMatchSeq, #doc_info{} = DI, 
Proc) ->
+update_or_delete_index(ConnPid, Db, #index{} = Index, FromSeq, #doc_info{} = 
DI, Proc) ->
     #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} 
= DI,
     case Del of
         true ->
-            case nouveau_api:delete_doc(Index, Id, IfMatchSeq, Seq) of
+            case nouveau_api:delete_doc(ConnPid, Index, Id, FromSeq, Seq) of
                 {ok, ReqId} ->
                     {ReqId, Seq};
                 {error, Reason} ->
@@ -178,14 +182,18 @@ update_or_delete_index(Db, #index{} = Index, IfMatchSeq, 
#doc_info{} = DI, Proc)
                 end,
             case Fields of
                 [] ->
-                    case nouveau_api:delete_doc(Index, Id, IfMatchSeq, Seq) of
+                    case nouveau_api:delete_doc(ConnPid, Index, Id, FromSeq, 
Seq) of
                         {ok, ReqId} ->
                             {ReqId, Seq};
                         {error, Reason} ->
                             exit({error, Reason})
                     end;
                 _ ->
-                    case nouveau_api:update_doc(Index, Id, IfMatchSeq, Seq, 
Partition, Fields) of
+                    case
+                        nouveau_api:update_doc(
+                            ConnPid, Index, Id, FromSeq, Seq, Partition, Fields
+                        )
+                    of
                         {ok, ReqId} ->
                             {ReqId, Seq};
                         {error, Reason} ->
@@ -234,7 +242,7 @@ index_definition(#index{} = Index) ->
         <<"field_analyzers">> => Index#index.field_analyzers
     }.
 
-purge_index(Db, Index, IndexPurgeSeq) ->
+purge_index(ConnPid, Db, Index, IndexPurgeSeq) ->
     Proc = get_os_process(Index#index.def_lang),
     try
         true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, 
<<"nouveau">>]),
@@ -251,8 +259,7 @@ purge_index(Db, Index, IndexPurgeSeq) ->
                             true ->
                                 Acc;
                             false ->
-                                %% TODO
-                                update_or_delete_index(Db, Index, 0, DI, Proc),
+                                update_or_delete_index(ConnPid, Db, Index, 
PurgeSeq, DI, Proc),
                                 [{Id, Rev} | Acc]
                         end
                 end,

Reply via email to