This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch nouveau-index-ordering-bugfix in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit b4f3ca552fcbc344bbfaf4a1b252d5992daf9350 Author: Robert Newson <[email protected]> AuthorDate: Tue Sep 19 14:45:20 2023 +0100 fix nouveau index updating ordering Use the same connection PID for all index update requests to ensure they are applied in the correct order. Previously some requests (when purging) were handled by the pool and could get reordered. --- src/nouveau/src/nouveau_api.erl | 33 ++++++++++++++++++------------- src/nouveau/src/nouveau_index_updater.erl | 8 ++++---- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl index c133d951b..57de204f2 100644 --- a/src/nouveau/src/nouveau_api.erl +++ b/src/nouveau/src/nouveau_api.erl @@ -24,11 +24,11 @@ delete_path/1, delete_path/2, delete_doc_async/5, - purge_doc/4, + purge_doc/5, update_doc_async/7, search/2, - set_purge_seq/3, - set_update_seq/3, + set_purge_seq/4, + set_update_seq/4, drain_async_responses/2, jaxrs_error/2 ]). @@ -119,12 +119,17 @@ delete_doc_async(ConnPid, #index{} = Index, DocId, MatchSeq, UpdateSeq) when ] ). -purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when - is_binary(DocId), is_integer(MatchSeq), MatchSeq >= 0, is_integer(PurgeSeq), PurgeSeq > 0 +purge_doc(ConnPid, #index{} = Index, DocId, MatchSeq, PurgeSeq) when + is_pid(ConnPid), + is_binary(DocId), + is_integer(MatchSeq), + MatchSeq >= 0, + is_integer(PurgeSeq), + PurgeSeq > 0 -> ReqBody = #{match_seq => MatchSeq, seq => PurgeSeq, purge => true}, - Resp = send_if_enabled( - doc_url(Index, DocId), [?JSON_CONTENT_TYPE], delete, jiffy:encode(ReqBody) + Resp = send_direct_if_enabled( + ConnPid, doc_url(Index, DocId), [?JSON_CONTENT_TYPE], delete, jiffy:encode(ReqBody), [] ), case Resp of {ok, "204", _, _} -> @@ -175,22 +180,22 @@ search(#index{} = Index, QueryArgs) -> send_error(Reason) end. -set_update_seq(#index{} = Index, MatchSeq, UpdateSeq) -> +set_update_seq(ConnPid, #index{} = Index, MatchSeq, UpdateSeq) -> ReqBody = #{ match_update_seq => MatchSeq, update_seq => UpdateSeq }, - set_seq(Index, ReqBody). -set_purge_seq(#index{} = Index, MatchSeq, PurgeSeq) -> + set_seq(ConnPid, Index, ReqBody). +set_purge_seq(ConnPid, #index{} = Index, MatchSeq, PurgeSeq) -> ReqBody = #{ match_purge_seq => MatchSeq, purge_seq => PurgeSeq }, - set_seq(Index, ReqBody). + set_seq(ConnPid, Index, ReqBody). -set_seq(#index{} = Index, ReqBody) -> - Resp = send_if_enabled( - index_url(Index), [?JSON_CONTENT_TYPE], post, jiffy:encode(ReqBody) +set_seq(ConnPid, #index{} = Index, ReqBody) -> + Resp = send_direct_if_enabled( + ConnPid, index_url(Index), [?JSON_CONTENT_TYPE], post, jiffy:encode(ReqBody), [] ), case Resp of {ok, "204", _, _} -> diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl index 6aa8e5781..1d11e98b4 100644 --- a/src/nouveau/src/nouveau_index_updater.erl +++ b/src/nouveau/src/nouveau_index_updater.erl @@ -107,9 +107,9 @@ update(#index{} = Index) -> Db, Acc0#acc.update_seq, fun load_docs/2, Acc0, [] ), nouveau_api:drain_async_responses(Acc1#acc.reqids, 0), - ibrowse:stop_worker_process(ConnPid), - exit(nouveau_api:set_update_seq(Index, Acc1#acc.update_seq, NewCurSeq)) + exit(nouveau_api:set_update_seq(ConnPid, Index, Acc1#acc.update_seq, NewCurSeq)) after + ibrowse:stop_worker_process(ConnPid), ret_os_process(Proc) end end @@ -232,7 +232,7 @@ purge_index(ConnPid, Db, Index, #purge_acc{} = PurgeAcc0) -> case couch_db:get_full_doc_info(Db, Id) of not_found -> ok = nouveau_api:purge_doc( - Index, Id, PurgeAcc1#purge_acc.index_purge_seq, PurgeSeq + ConnPid, Index, Id, PurgeAcc1#purge_acc.index_purge_seq, PurgeSeq ), PurgeAcc1#purge_acc{index_purge_seq = PurgeSeq}; FDI -> @@ -265,7 +265,7 @@ purge_index(ConnPid, Db, Index, #purge_acc{} = PurgeAcc0) -> ), DbPurgeSeq = couch_db:get_purge_seq(Db), ok = nouveau_api:set_purge_seq( - Index, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq + ConnPid, Index, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq ), update_local_doc(Db, Index, DbPurgeSeq), {ok, PurgeAcc3}
