This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch nouveau-streaming-index-update in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 3bf58b293014c1d11d0dcb70a13ef36a545fe937 Author: Robert Newson <[email protected]> AuthorDate: Thu Feb 19 21:45:37 2026 +0000 WIP stream updates for performance --- .../couchdb/nouveau/resources/IndexResource.java | 21 ++++++++ src/nouveau/src/nouveau_api.erl | 62 +++++++++++----------- src/nouveau/src/nouveau_index_updater.erl | 21 ++++---- 3 files changed, 63 insertions(+), 41 deletions(-) diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java index 9ba382109..496f5e6f6 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java @@ -16,6 +16,7 @@ package org.apache.couchdb.nouveau.resources; import com.codahale.metrics.annotation.ExceptionMetered; import com.codahale.metrics.annotation.Metered; import com.codahale.metrics.annotation.ResponseMetered; +import jakarta.servlet.http.HttpServletRequest; import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; import jakarta.ws.rs.Consumes; @@ -27,6 +28,7 @@ import jakarta.ws.rs.Path; import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response.Status; import java.io.IOException; @@ -41,6 +43,8 @@ import org.apache.couchdb.nouveau.api.Ok; import org.apache.couchdb.nouveau.api.SearchRequest; import org.apache.couchdb.nouveau.api.SearchResults; import org.apache.couchdb.nouveau.core.IndexManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Path("/index/{name}") @Metered @@ -50,6 +54,8 @@ import org.apache.couchdb.nouveau.core.IndexManager; @Produces(MediaType.APPLICATION_JSON) public final class IndexResource { + private static final Logger LOGGER = LoggerFactory.getLogger(IndexResource.class); + private final IndexManager indexManager; public IndexResource(final IndexManager indexManager) { @@ -67,6 +73,7 @@ public final class IndexResource { return Ok.INSTANCE; } + @Deprecated(since = "2.5.2", forRemoval = true) @DELETE @Path("/doc/{docId}") public Ok deleteDoc( @@ -120,6 +127,7 @@ public final class IndexResource { }); } + @Deprecated(since = "2.5.2", forRemoval = true) @PUT @Path("/doc/{docId}") public Ok updateDoc( @@ -132,4 +140,17 @@ public final class IndexResource { return Ok.INSTANCE; }); } + + @POST + @Path("/update") + @Consumes({"application/json-seq"}) + public Ok updates(@PathParam("name") String name, @Context HttpServletRequest req) throws Exception { + LOGGER.info("updates for {}", name); + var reader = req.getReader(); + String line; + while ((line = reader.readLine()) != null) { + LOGGER.info("line: {}", line); + } + return Ok.INSTANCE; + } } diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl index 2d140e580..f4bfe580f 100644 --- a/src/nouveau/src/nouveau_api.erl +++ b/src/nouveau/src/nouveau_api.erl @@ -26,6 +26,8 @@ delete_doc/4, purge_doc/4, update_doc/6, + start_update/1, + end_update/1, search/2, set_purge_seq/3, set_update_seq/3, @@ -34,6 +36,7 @@ ]). -define(JSON_CONTENT_TYPE, {"Content-Type", "application/json"}). +-define(JSON_SEQ_CONTENT_TYPE, {"Content-Type", "application/json-seq"}). analyze(Text, Analyzer) when is_binary(Text), is_binary(Analyzer) @@ -99,28 +102,15 @@ delete_path(Path, Exclusions) when send_error(Reason) end. -delete_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq) when +delete_doc({_, _} = PoolStreamRef, DocId, MatchSeq, UpdateSeq) when is_binary(DocId), is_integer(MatchSeq), MatchSeq >= 0, is_integer(UpdateSeq), UpdateSeq > 0 -> - ReqBody = #{match_seq => MatchSeq, seq => UpdateSeq, purge => false}, - Resp = send_if_enabled( - doc_path(Index, DocId), - [?JSON_CONTENT_TYPE], - <<"DELETE">>, - jiffy:encode(ReqBody) - ), - case Resp of - {ok, 200, _, _} -> - ok; - {ok, StatusCode, _, RespBody} -> - {error, jaxrs_error(StatusCode, RespBody)}; - {error, Reason} -> - send_error(Reason) - end. + Row = #{doc_id => DocId, match_seq => MatchSeq, seq => UpdateSeq, purge => false}, + gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)). purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when is_binary(DocId), @@ -142,7 +132,21 @@ purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when send_error(Reason) end. -update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) when +start_update(#index{} = Index) -> + case nouveau:enabled() of + true -> + gun_pool:post( + update_path(Index), + [nouveau_gun:host_header(), ?JSON_SEQ_CONTENT_TYPE] + ); + false -> + {error, nouveau_not_enabled} + end. + +end_update({_, _} = PoolStreamRef) -> + gun_pool:data(PoolStreamRef, fin, <<>>). + +update_doc({_, _} = PoolStreamRef, DocId, MatchSeq, UpdateSeq, Partition, Fields) when is_binary(DocId), is_integer(MatchSeq), MatchSeq >= 0, @@ -151,26 +155,14 @@ update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) when (is_binary(Partition) orelse Partition == null), is_list(Fields) -> - ReqBody = #{ + Row = #{ + doc_id => DocId, match_seq => MatchSeq, seq => UpdateSeq, partition => Partition, fields => Fields }, - Resp = send_if_enabled( - doc_path(Index, DocId), - [?JSON_CONTENT_TYPE], - <<"PUT">>, - jiffy:encode(ReqBody) - ), - case Resp of - {ok, 200, _, _} -> - ok; - {ok, StatusCode, _, RespBody} -> - {error, jaxrs_error(StatusCode, RespBody)}; - {error, Reason} -> - send_error(Reason) - end. + gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)). search(#index{} = Index, QueryArgs) -> Resp = send_if_enabled( @@ -245,6 +237,9 @@ doc_path(#index{} = Index, DocId) -> search_path(#index{} = Index) -> [index_path(Index), <<"/search">>]. +update_path(#index{} = Index) -> + [index_path(Index), <<"/update">>]. + jaxrs_error(400, Body) -> {bad_request, message(Body)}; jaxrs_error(404, Body) -> @@ -314,3 +309,6 @@ send_if_enabled(Path, ReqHeaders, Method, ReqBody, RemainingTries) -> false -> {error, nouveau_not_enabled} end. + +encode_json_seq(Data) -> + [$\x{1e}, jiffy:encode(Data), $\n]. diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl index 4bfea753a..bf2ed1998 100644 --- a/src/nouveau/src/nouveau_index_updater.erl +++ b/src/nouveau/src/nouveau_index_updater.erl @@ -27,6 +27,7 @@ -import(nouveau_util, [index_path/1]). -record(acc, { + pool_stream_ref, db, index, proc, @@ -80,14 +81,15 @@ update(#index{} = Index) -> index_update_seq = IndexUpdateSeq, index_purge_seq = IndexPurgeSeq }, - {ok, PurgeAcc1} = purge_index(Db, Index, PurgeAcc0), - + {async, PoolStreamRef} = nouveau_api:start_update(Index), + {ok, PurgeAcc1} = purge_index(PoolStreamRef, Db, Index, PurgeAcc0), NewCurSeq = couch_db:get_update_seq(Db), Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), Acc0 = #acc{ + pool_stream_ref = PoolStreamRef, db = Db, index = Index, proc = Proc, @@ -101,6 +103,7 @@ update(#index{} = Index) -> ), exit(nouveau_api:set_update_seq(Index, Acc1#acc.update_seq, NewCurSeq)) after + nouveau_api:end_update(PoolStreamRef), ret_os_process(Proc) end end @@ -126,7 +129,7 @@ load_docs(FDI, #acc{} = Acc1) -> case update_or_delete_index( Acc1#acc.db, - Acc1#acc.index, + Acc1#acc.pool_stream_ref, Acc1#acc.update_seq, DI, Acc1#acc.proc @@ -142,11 +145,11 @@ load_docs(FDI, #acc{} = Acc1) -> end, {ok, Acc2#acc{changes_done = Acc2#acc.changes_done + 1}}. -update_or_delete_index(Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) -> +update_or_delete_index(Db, PoolStreamRef, MatchSeq, #doc_info{} = DI, Proc) -> #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI, case Del of true -> - nouveau_api:delete_doc(Index, Id, MatchSeq, Seq); + nouveau_api:delete_doc(PoolStreamRef, Id, MatchSeq, Seq); false -> {ok, Doc} = couch_db:open_doc(Db, DI, []), Json = couch_doc:to_json_obj(Doc, []), @@ -160,10 +163,10 @@ update_or_delete_index(Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) - end, case Fields of [] -> - nouveau_api:delete_doc(Index, Id, MatchSeq, Seq); + nouveau_api:delete_doc(PoolStreamRef, Id, MatchSeq, Seq); _ -> nouveau_api:update_doc( - Index, Id, MatchSeq, Seq, Partition, Fields + PoolStreamRef, Id, MatchSeq, Seq, Partition, Fields ) end end. @@ -209,7 +212,7 @@ index_definition(#index{} = Index) -> <<"field_analyzers">> => Index#index.field_analyzers }. -purge_index(Db, Index, #purge_acc{} = PurgeAcc0) -> +purge_index(PoolStreamRef, Db, Index, #purge_acc{} = PurgeAcc0) -> Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), @@ -230,7 +233,7 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) -> false -> update_or_delete_index( Db, - Index, + PoolStreamRef, PurgeAcc1#purge_acc.index_update_seq, DI, Proc
