This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch nouveau-index-ordering-bugfix
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit b4f3ca552fcbc344bbfaf4a1b252d5992daf9350
Author: Robert Newson <[email protected]>
AuthorDate: Tue Sep 19 14:45:20 2023 +0100

    fix nouveau index updating ordering
    
    Use the same connection PID for all index update requests to
    ensure they are applied in the correct order.
    
    Previously some requests (when purging) were handled by the pool
    and could get reordered.
---
 src/nouveau/src/nouveau_api.erl           | 33 ++++++++++++++++++-------------
 src/nouveau/src/nouveau_index_updater.erl |  8 ++++----
 2 files changed, 23 insertions(+), 18 deletions(-)

diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl
index c133d951b..57de204f2 100644
--- a/src/nouveau/src/nouveau_api.erl
+++ b/src/nouveau/src/nouveau_api.erl
@@ -24,11 +24,11 @@
     delete_path/1,
     delete_path/2,
     delete_doc_async/5,
-    purge_doc/4,
+    purge_doc/5,
     update_doc_async/7,
     search/2,
-    set_purge_seq/3,
-    set_update_seq/3,
+    set_purge_seq/4,
+    set_update_seq/4,
     drain_async_responses/2,
     jaxrs_error/2
 ]).
@@ -119,12 +119,17 @@ delete_doc_async(ConnPid, #index{} = Index, DocId, 
MatchSeq, UpdateSeq) when
         ]
     ).
 
-purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when
-    is_binary(DocId), is_integer(MatchSeq), MatchSeq >= 0, 
is_integer(PurgeSeq), PurgeSeq > 0
+purge_doc(ConnPid, #index{} = Index, DocId, MatchSeq, PurgeSeq) when
+    is_pid(ConnPid),
+    is_binary(DocId),
+    is_integer(MatchSeq),
+    MatchSeq >= 0,
+    is_integer(PurgeSeq),
+    PurgeSeq > 0
 ->
     ReqBody = #{match_seq => MatchSeq, seq => PurgeSeq, purge => true},
-    Resp = send_if_enabled(
-        doc_url(Index, DocId), [?JSON_CONTENT_TYPE], delete, 
jiffy:encode(ReqBody)
+    Resp = send_direct_if_enabled(
+        ConnPid, doc_url(Index, DocId), [?JSON_CONTENT_TYPE], delete, 
jiffy:encode(ReqBody), []
     ),
     case Resp of
         {ok, "204", _, _} ->
@@ -175,22 +180,22 @@ search(#index{} = Index, QueryArgs) ->
             send_error(Reason)
     end.
 
-set_update_seq(#index{} = Index, MatchSeq, UpdateSeq) ->
+set_update_seq(ConnPid, #index{} = Index, MatchSeq, UpdateSeq) ->
     ReqBody = #{
         match_update_seq => MatchSeq,
         update_seq => UpdateSeq
     },
-    set_seq(Index, ReqBody).
-set_purge_seq(#index{} = Index, MatchSeq, PurgeSeq) ->
+    set_seq(ConnPid, Index, ReqBody).
+set_purge_seq(ConnPid, #index{} = Index, MatchSeq, PurgeSeq) ->
     ReqBody = #{
         match_purge_seq => MatchSeq,
         purge_seq => PurgeSeq
     },
-    set_seq(Index, ReqBody).
+    set_seq(ConnPid, Index, ReqBody).
 
-set_seq(#index{} = Index, ReqBody) ->
-    Resp = send_if_enabled(
-        index_url(Index), [?JSON_CONTENT_TYPE], post, jiffy:encode(ReqBody)
+set_seq(ConnPid, #index{} = Index, ReqBody) ->
+    Resp = send_direct_if_enabled(
+        ConnPid, index_url(Index), [?JSON_CONTENT_TYPE], post, 
jiffy:encode(ReqBody), []
     ),
     case Resp of
         {ok, "204", _, _} ->
diff --git a/src/nouveau/src/nouveau_index_updater.erl 
b/src/nouveau/src/nouveau_index_updater.erl
index 6aa8e5781..1d11e98b4 100644
--- a/src/nouveau/src/nouveau_index_updater.erl
+++ b/src/nouveau/src/nouveau_index_updater.erl
@@ -107,9 +107,9 @@ update(#index{} = Index) ->
                         Db, Acc0#acc.update_seq, fun load_docs/2, Acc0, []
                     ),
                     nouveau_api:drain_async_responses(Acc1#acc.reqids, 0),
-                    ibrowse:stop_worker_process(ConnPid),
-                    exit(nouveau_api:set_update_seq(Index, 
Acc1#acc.update_seq, NewCurSeq))
+                    exit(nouveau_api:set_update_seq(ConnPid, Index, 
Acc1#acc.update_seq, NewCurSeq))
                 after
+                    ibrowse:stop_worker_process(ConnPid),
                     ret_os_process(Proc)
                 end
         end
@@ -232,7 +232,7 @@ purge_index(ConnPid, Db, Index, #purge_acc{} = PurgeAcc0) ->
                 case couch_db:get_full_doc_info(Db, Id) of
                     not_found ->
                         ok = nouveau_api:purge_doc(
-                            Index, Id, PurgeAcc1#purge_acc.index_purge_seq, 
PurgeSeq
+                            ConnPid, Index, Id, 
PurgeAcc1#purge_acc.index_purge_seq, PurgeSeq
                         ),
                         PurgeAcc1#purge_acc{index_purge_seq = PurgeSeq};
                     FDI ->
@@ -265,7 +265,7 @@ purge_index(ConnPid, Db, Index, #purge_acc{} = PurgeAcc0) ->
         ),
         DbPurgeSeq = couch_db:get_purge_seq(Db),
         ok = nouveau_api:set_purge_seq(
-            Index, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq
+            ConnPid, Index, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq
         ),
         update_local_doc(Db, Index, DbPurgeSeq),
         {ok, PurgeAcc3}

Reply via email to