This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch nouveau-ibrowse-improvements
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 35b32dbbb1f73dcfe8efd7bb033ed268f0aea064
Author: Robert Newson <[email protected]>
AuthorDate: Mon Sep 4 18:53:43 2023 +0100

    Async dispatch to dedicated ibrowse process for performance
---
 src/nouveau/src/nouveau_api.erl           | 104 +++++++++++++++++++++++-------
 src/nouveau/src/nouveau_index_updater.erl |  73 ++++++++++++++-------
 2 files changed, 130 insertions(+), 47 deletions(-)

diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl
index 39cc2b32f..c5d147fb2 100644
--- a/src/nouveau/src/nouveau_api.erl
+++ b/src/nouveau/src/nouveau_api.erl
@@ -23,12 +23,14 @@
     create_index/2,
     delete_path/1,
     delete_path/2,
-    delete_doc/3,
+    delete_doc_async/4,
     purge_doc/3,
-    update_doc/5,
+    update_doc_async/6,
     search/2,
     set_purge_seq/2,
-    set_update_seq/2
+    set_update_seq/2,
+    drain_async_responses/1,
+    jaxrs_error/2
 ]).
 
 -define(JSON_CONTENT_TYPE, {"Content-Type", "application/json"}).
@@ -97,20 +99,25 @@ delete_path(Path, Exclusions) when
             send_error(Reason)
     end.
 
-delete_doc(#index{} = Index, DocId, UpdateSeq) when
-    is_binary(DocId), is_integer(UpdateSeq)
+delete_doc_async(ConnPid, #index{} = Index, DocId, UpdateSeq) when
+    is_pid(ConnPid), is_binary(DocId), is_integer(UpdateSeq)
 ->
-    delete_doc(Index, DocId, UpdateSeq, false).
+    ReqBody = #{seq => UpdateSeq, purge => false},
+    send_direct_if_enabled(
+        ConnPid,
+        doc_url(Index, DocId),
+        [?JSON_CONTENT_TYPE],
+        delete,
+        jiffy:encode(ReqBody),
+        [
+            {stream_to, self()}
+        ]
+    ).
 
 purge_doc(#index{} = Index, DocId, PurgeSeq) when
     is_binary(DocId), is_integer(PurgeSeq)
 ->
-    delete_doc(Index, DocId, PurgeSeq, true).
-
-delete_doc(#index{} = Index, DocId, Seq, IsPurge) when
-    is_binary(DocId), is_integer(Seq), is_boolean(IsPurge)
-->
-    ReqBody = #{seq => Seq, purge => IsPurge},
+    ReqBody = #{seq => PurgeSeq, purge => true},
     Resp = send_if_enabled(
         doc_url(Index, DocId), [?JSON_CONTENT_TYPE], delete, 
jiffy:encode(ReqBody)
     ),
@@ -123,7 +130,8 @@ delete_doc(#index{} = Index, DocId, Seq, IsPurge) when
             send_error(Reason)
     end.
 
-update_doc(#index{} = Index, DocId, UpdateSeq, Partition, Fields) when
+update_doc_async(ConnPid, #index{} = Index, DocId, UpdateSeq, Partition, 
Fields) when
+    is_pid(ConnPid),
     is_binary(DocId),
     is_integer(UpdateSeq),
     (is_binary(Partition) orelse Partition == null),
@@ -134,17 +142,16 @@ update_doc(#index{} = Index, DocId, UpdateSeq, Partition, 
Fields) when
         partition => Partition,
         fields => Fields
     },
-    Resp = send_if_enabled(
-        doc_url(Index, DocId), [?JSON_CONTENT_TYPE], put, jiffy:encode(ReqBody)
-    ),
-    case Resp of
-        {ok, "204", _, _} ->
-            ok;
-        {ok, StatusCode, _, RespBody} ->
-            {error, jaxrs_error(StatusCode, RespBody)};
-        {error, Reason} ->
-            send_error(Reason)
-    end.
+    send_direct_if_enabled(
+        ConnPid,
+        doc_url(Index, DocId),
+        [?JSON_CONTENT_TYPE],
+        put,
+        jiffy:encode(ReqBody),
+        [
+            {stream_to, self()}
+        ]
+    ).
 
 search(#index{} = Index, QueryArgs) ->
     Resp = send_if_enabled(
@@ -180,6 +187,42 @@ set_seq(#index{} = Index, Key, Value) when is_atom(Key), 
is_integer(Value) ->
             send_error(Reason)
     end.
 
+drain_async_responses(List) when is_list(List) ->
+    drain_async_responses_list(List);
+drain_async_responses(Timeout) when is_integer(Timeout); Timeout == infinity ->
+    drain_async_responses_timeout(Timeout, []).
+
+drain_async_responses_list([]) ->
+    ok;
+drain_async_responses_list([ReqId | Rest]) ->
+    receive
+        {ibrowse_async_headers, ReqId, Code, Headers} ->
+            drain_async_response(ReqId, Code, Headers, undefined)
+    end,
+    drain_async_responses_list(Rest).
+
+
+drain_async_responses_timeout(Timeout, ReqIds) when is_integer(Timeout); 
Timeout == infinity ->
+    receive
+        {ibrowse_async_headers, ReqId, Code0, Headers0} ->
+            case drain_async_response(ReqId, Code0, Headers0, undefined) of
+                {ok, "204", _Headers, _Body} ->
+                    drain_async_responses_timeout(Timeout, [ReqId | ReqIds]);
+                {ok, StatusCode, _Headers, RespBody} ->
+                    exit({error, jaxrs_error(StatusCode, RespBody)})
+            end
+    after Timeout ->
+        ReqIds
+    end.
+
+drain_async_response(ReqId, Code0, Headers0, Body0) ->
+    receive
+        {ibrowse_async_response, ReqId, Body1} ->
+            drain_async_response(ReqId, Code0, Headers0, Body1);
+        {ibrowse_async_response_end, ReqId} ->
+            {ok, Code0, Headers0, Body0}
+    end.
+
 %% private functions
 
 index_path(Path) ->
@@ -249,9 +292,20 @@ send_if_enabled(Url, Header, Method) ->
     send_if_enabled(Url, Header, Method, []).
 
 send_if_enabled(Url, Header, Method, Body) ->
+    send_if_enabled(Url, Header, Method, Body, []).
+
+send_if_enabled(Url, Header, Method, Body, Options) ->
+    case nouveau:enabled() of
+        true ->
+            ibrowse:send_req(Url, Header, Method, Body, Options);
+        false ->
+            {error, nouveau_not_enabled}
+    end.
+
+send_direct_if_enabled(ConnPid, Url, Header, Method, Body, Options) ->
     case nouveau:enabled() of
         true ->
-            ibrowse:send_req(Url, Header, Method, Body);
+            ibrowse:send_req_direct(ConnPid, Url, Header, Method, Body, 
Options);
         false ->
             {error, nouveau_not_enabled}
     end.
diff --git a/src/nouveau/src/nouveau_index_updater.erl 
b/src/nouveau/src/nouveau_index_updater.erl
index d6ca12635..d8d75364c 100644
--- a/src/nouveau/src/nouveau_index_updater.erl
+++ b/src/nouveau/src/nouveau_index_updater.erl
@@ -32,7 +32,9 @@
     proc,
     changes_done,
     total_changes,
-    exclude_idrevs
+    exclude_idrevs,
+    reqids = [],
+    conn_pid
 }).
 
 outdated(#index{} = Index) ->
@@ -69,14 +71,28 @@ update(#index{} = Index) ->
                 %% update status every half second
                 couch_task_status:set_update_frequency(500),
 
-                {ok, ExcludeIdRevs} = purge_index(Db, Index, IndexPurgeSeq),
+                {ok, ConnPid} = 
ibrowse:spawn_link_worker_process(nouveau_util:nouveau_url()),
+                {ok, ExcludeIdRevs} = purge_index(ConnPid, Db, Index, 
IndexPurgeSeq),
 
                 NewCurSeq = couch_db:get_update_seq(Db),
                 Proc = get_os_process(Index#index.def_lang),
                 try
                     true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, 
<<"nouveau">>]),
-                    Acc0 = #acc{db = Db, index=Index, proc=Proc, 
changes_done=0, total_changes=TotalChanges, exclude_idrevs=ExcludeIdRevs},
-                    {ok, _} = couch_db:fold_changes(Db, IndexUpdateSeq, fun 
load_docs/2, Acc0, []),
+
+                    Acc0 = #acc{
+                        db = Db,
+                        index = Index,
+                        proc = Proc,
+                        changes_done = 0,
+                        total_changes = TotalChanges,
+                        exclude_idrevs = ExcludeIdRevs,
+                        conn_pid = ConnPid
+                    },
+                    {ok, Acc1} = couch_db:fold_changes(
+                        Db, IndexUpdateSeq, fun load_docs/2, Acc0, []
+                    ),
+                    
nouveau_api:drain_async_responses(lists:reverse(Acc1#acc.reqids)),
+                    ibrowse:stop_worker_process(ConnPid),
                     ok = nouveau_api:set_update_seq(Index, NewCurSeq)
                 after
                     ret_os_process(Proc)
@@ -88,23 +104,41 @@ update(#index{} = Index) ->
 
 load_docs(#full_doc_info{id = <<"_design/", _/binary>>}, #acc{} = Acc) ->
     {ok, Acc};
-load_docs(FDI, #acc{} = Acc) ->
+load_docs(FDI, #acc{} = Acc0) ->
+    %% collect completed requests without blocking
+    ReqIds = nouveau_api:drain_async_responses(0),
+    Acc1 = Acc0#acc{reqids = Acc0#acc.reqids -- ReqIds},
+
     couch_task_status:update([
-        {changes_done, Acc#acc.changes_done}, {progress, (Acc#acc.changes_done 
* 100) div Acc#acc.total_changes}
+        {changes_done, Acc1#acc.changes_done},
+        {progress, (Acc1#acc.changes_done * 100) div Acc1#acc.total_changes}
     ]),
     DI = couch_doc:to_doc_info(FDI),
     #doc_info{id = Id, revs = [#rev_info{rev = Rev} | _]} = DI,
-    case lists:member({Id, Rev}, Acc#acc.exclude_idrevs) of
-        true -> ok;
-        false -> update_or_delete_index(Acc#acc.db, Acc#acc.index, DI, 
Acc#acc.proc)
-    end,
-    {ok, Acc#acc{changes_done = Acc#acc.changes_done + 1}}.
 
-update_or_delete_index(Db, #index{} = Index, #doc_info{} = DI, Proc) ->
+    Acc2 =
+        case lists:member({Id, Rev}, Acc1#acc.exclude_idrevs) of
+            true ->
+                Acc1;
+            false ->
+                case
+                    update_or_delete_index(
+                        Acc1#acc.conn_pid, Acc1#acc.db, Acc1#acc.index, DI, 
Acc1#acc.proc
+                    )
+                of
+                    {ibrowse_req_id, ReqId} ->
+                        Acc1#acc{reqids = [ReqId | Acc1#acc.reqids]};
+                    {error, Reason} ->
+                        exit({error, Reason})
+                end
+        end,
+    {ok, Acc2#acc{changes_done = Acc2#acc.changes_done + 1}}.
+
+update_or_delete_index(ConnPid, Db, #index{} = Index, #doc_info{} = DI, Proc) 
->
     #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} 
= DI,
     case Del of
         true ->
-            ok = nouveau_api:delete_doc(Index, Id, Seq);
+            nouveau_api:delete_doc_async(ConnPid, Index, Id, Seq);
         false ->
             {ok, Doc} = couch_db:open_doc(Db, DI, []),
             Json = couch_doc:to_json_obj(Doc, []),
@@ -118,14 +152,9 @@ update_or_delete_index(Db, #index{} = Index, #doc_info{} = 
DI, Proc) ->
                 end,
             case Fields of
                 [] ->
-                    ok = nouveau_api:delete_doc(Index, Id, Seq);
+                    nouveau_api:delete_doc_async(ConnPid, Index, Id, Seq);
                 _ ->
-                    case nouveau_api:update_doc(Index, Id, Seq, Partition, 
Fields) of
-                        ok ->
-                            ok;
-                        {error, Reason} ->
-                            exit({error, Reason})
-                    end
+                    nouveau_api:update_doc_async(ConnPid, Index, Id, Seq, 
Partition, Fields)
             end
     end.
 
@@ -169,7 +198,7 @@ index_definition(#index{} = Index) ->
         <<"field_analyzers">> => Index#index.field_analyzers
     }.
 
-purge_index(Db, Index, IndexPurgeSeq) ->
+purge_index(ConnPid, Db, Index, IndexPurgeSeq) ->
     Proc = get_os_process(Index#index.def_lang),
     try
         true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, 
<<"nouveau">>]),
@@ -186,7 +215,7 @@ purge_index(Db, Index, IndexPurgeSeq) ->
                             true ->
                                 Acc;
                             false ->
-                                update_or_delete_index(Db, Index, DI, Proc),
+                                update_or_delete_index(ConnPid, Db, Index, DI, 
Proc),
                                 [{Id, Rev} | Acc]
                         end
                 end,

Reply via email to