This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch nouveau-streaming-index-update in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 95a950048baafd75b62298da25a1db62993f07bf Author: Robert Newson <[email protected]> AuthorDate: Thu Feb 19 21:45:37 2026 +0000 stream updates for performance --- .../apache/couchdb/nouveau/NouveauApplication.java | 2 +- .../couchdb/nouveau/api/DocumentDeleteRequest.java | 4 +- .../couchdb/nouveau/api/DocumentRequest.java | 38 ++++++++++ .../couchdb/nouveau/api/DocumentUpdateRequest.java | 4 +- .../couchdb/nouveau/health/IndexHealthCheck.java | 2 +- .../couchdb/nouveau/resources/IndexResource.java | 35 ++++++++- src/nouveau/src/nouveau_api.erl | 88 ++++++++++++++-------- src/nouveau/src/nouveau_index_updater.erl | 23 +++--- 8 files changed, 149 insertions(+), 47 deletions(-) diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java index c2230d1eb..eb886d826 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java @@ -77,7 +77,7 @@ public class NouveauApplication extends Application<NouveauApplicationConfigurat environment.jersey().register(analyzeResource); // IndexResource - final IndexResource indexResource = new IndexResource(indexManager); + final IndexResource indexResource = new IndexResource(indexManager, environment.getObjectMapper()); environment.jersey().register(indexResource); // Health checks diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java index 82e9b716a..ddc067a16 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java @@ -17,7 +17,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.Positive; import jakarta.validation.constraints.PositiveOrZero; -public final class DocumentDeleteRequest { +public final class DocumentDeleteRequest extends DocumentRequest { @PositiveOrZero private final long matchSeq; @@ -28,9 +28,11 @@ public final class DocumentDeleteRequest { private final boolean purge; public DocumentDeleteRequest( + @JsonProperty("doc_id") final String id, @JsonProperty("match_seq") final long matchSeq, @JsonProperty("seq") final long seq, @JsonProperty("purge") final boolean purge) { + super(id); if (matchSeq < 0) { throw new IllegalArgumentException("matchSeq must be 0 or greater"); } diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentRequest.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentRequest.java new file mode 100644 index 000000000..05e60988a --- /dev/null +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentRequest.java @@ -0,0 +1,38 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.apache.couchdb.nouveau.api; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; + +@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = DocumentUpdateRequest.class, name = "update"), + @JsonSubTypes.Type(value = DocumentDeleteRequest.class, name = "delete"), +}) +public abstract class DocumentRequest { + + private final String id; + + protected DocumentRequest(final String id) { + this.id = id; + } + + public final String getId() { + return id; + } +} diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java index 82c196602..7b1db0c09 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java @@ -20,7 +20,7 @@ import jakarta.validation.constraints.Positive; import jakarta.validation.constraints.PositiveOrZero; import java.util.Collection; -public final class DocumentUpdateRequest { +public final class DocumentUpdateRequest extends DocumentRequest { @PositiveOrZero private final long matchSeq; @@ -35,10 +35,12 @@ public final class DocumentUpdateRequest { private final Collection<Field> fields; public DocumentUpdateRequest( + @JsonProperty("doc_id") final String id, @JsonProperty("match_seq") final long matchSeq, @JsonProperty("seq") final long seq, @JsonProperty("partition") final String partition, @JsonProperty("fields") final Collection<Field> fields) { + super(id); this.matchSeq = matchSeq; this.seq = seq; this.partition = partition; diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java index 7e5facb2e..3a70dd7a7 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java @@ -42,7 +42,7 @@ public final class IndexHealthCheck extends HealthCheck { indexResource.createIndex(name, new IndexDefinition(IndexDefinition.LATEST_LUCENE_VERSION, "standard", null)); try { final DocumentUpdateRequest documentUpdateRequest = - new DocumentUpdateRequest(0, 1, null, Collections.emptyList()); + new DocumentUpdateRequest("foo", 0, 1, null, Collections.emptyList()); indexResource.updateDoc(name, "foo", documentUpdateRequest); final SearchRequest searchRequest = new SearchRequest(); diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java index 9ba382109..f59f29369 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java @@ -16,6 +16,8 @@ package org.apache.couchdb.nouveau.resources; import com.codahale.metrics.annotation.ExceptionMetered; import com.codahale.metrics.annotation.Metered; import com.codahale.metrics.annotation.ResponseMetered; +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.servlet.http.HttpServletRequest; import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; import jakarta.ws.rs.Consumes; @@ -27,12 +29,14 @@ import jakarta.ws.rs.Path; import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response.Status; import java.io.IOException; import java.util.List; import java.util.Objects; import org.apache.couchdb.nouveau.api.DocumentDeleteRequest; +import org.apache.couchdb.nouveau.api.DocumentRequest; import org.apache.couchdb.nouveau.api.DocumentUpdateRequest; import org.apache.couchdb.nouveau.api.IndexDefinition; import org.apache.couchdb.nouveau.api.IndexInfo; @@ -41,6 +45,8 @@ import org.apache.couchdb.nouveau.api.Ok; import org.apache.couchdb.nouveau.api.SearchRequest; import org.apache.couchdb.nouveau.api.SearchResults; import org.apache.couchdb.nouveau.core.IndexManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Path("/index/{name}") @Metered @@ -50,10 +56,15 @@ import org.apache.couchdb.nouveau.core.IndexManager; @Produces(MediaType.APPLICATION_JSON) public final class IndexResource { + private static final Logger LOGGER = LoggerFactory.getLogger(IndexResource.class); + private final IndexManager indexManager; - public IndexResource(final IndexManager indexManager) { + private final ObjectMapper objectMapper; + + public IndexResource(final IndexManager indexManager, final ObjectMapper objectMapper) { this.indexManager = Objects.requireNonNull(indexManager); + this.objectMapper = Objects.requireNonNull(objectMapper); } @PUT @@ -67,6 +78,7 @@ public final class IndexResource { return Ok.INSTANCE; } + @Deprecated(since = "2.5.2", forRemoval = true) @DELETE @Path("/doc/{docId}") public Ok deleteDoc( @@ -120,6 +132,7 @@ public final class IndexResource { }); } + @Deprecated(since = "2.5.2", forRemoval = true) @PUT @Path("/doc/{docId}") public Ok updateDoc( @@ -132,4 +145,24 @@ public final class IndexResource { return Ok.INSTANCE; }); } + + @POST + @Path("/update") + @Consumes({"application/json-seq"}) + public Ok updates(@PathParam("name") String name, @Context HttpServletRequest req) throws Exception { + var reader = req.getReader(); + return indexManager.with(name, (index) -> { + String line; + while ((line = reader.readLine()) != null) { + var docReq = objectMapper.readValue(line.trim(), DocumentRequest.class); + if (docReq instanceof DocumentUpdateRequest) { + index.update(docReq.getId(), (DocumentUpdateRequest) docReq); + } + if (docReq instanceof DocumentDeleteRequest) { + index.delete(docReq.getId(), (DocumentDeleteRequest) docReq); + } + } + return Ok.INSTANCE; + }); + } } diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl index 2d140e580..741e6936c 100644 --- a/src/nouveau/src/nouveau_api.erl +++ b/src/nouveau/src/nouveau_api.erl @@ -26,6 +26,8 @@ delete_doc/4, purge_doc/4, update_doc/6, + start_update/1, + end_update/1, search/2, set_purge_seq/3, set_update_seq/3, @@ -34,6 +36,7 @@ ]). -define(JSON_CONTENT_TYPE, {"Content-Type", "application/json"}). +-define(JSON_SEQ_CONTENT_TYPE, {"Content-Type", "application/json-seq"}). analyze(Text, Analyzer) when is_binary(Text), is_binary(Analyzer) @@ -99,28 +102,16 @@ delete_path(Path, Exclusions) when send_error(Reason) end. -delete_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq) when +delete_doc({_, _} = PoolStreamRef, DocId, MatchSeq, UpdateSeq) when is_binary(DocId), is_integer(MatchSeq), MatchSeq >= 0, is_integer(UpdateSeq), UpdateSeq > 0 -> - ReqBody = #{match_seq => MatchSeq, seq => UpdateSeq, purge => false}, - Resp = send_if_enabled( - doc_path(Index, DocId), - [?JSON_CONTENT_TYPE], - <<"DELETE">>, - jiffy:encode(ReqBody) - ), - case Resp of - {ok, 200, _, _} -> - ok; - {ok, StatusCode, _, RespBody} -> - {error, jaxrs_error(StatusCode, RespBody)}; - {error, Reason} -> - send_error(Reason) - end. + Row = #{<<"@type">> => delete, doc_id => DocId, match_seq => MatchSeq, seq => UpdateSeq, purge => false}, + ok = gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)), + check_status(PoolStreamRef). purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when is_binary(DocId), @@ -142,7 +133,22 @@ purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when send_error(Reason) end. -update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) when +start_update(#index{} = Index) -> + case nouveau:enabled() of + true -> + gun_pool:post( + update_path(Index), + [nouveau_gun:host_header(), ?JSON_SEQ_CONTENT_TYPE] + ); + false -> + {error, nouveau_not_enabled} + end. + +end_update({_, _} = PoolStreamRef) -> + ok = gun_pool:data(PoolStreamRef, fin, <<>>), + check_status(PoolStreamRef). + +update_doc({_, _} = PoolStreamRef, DocId, MatchSeq, UpdateSeq, Partition, Fields) when is_binary(DocId), is_integer(MatchSeq), MatchSeq >= 0, @@ -151,26 +157,16 @@ update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) when (is_binary(Partition) orelse Partition == null), is_list(Fields) -> - ReqBody = #{ + Row = #{ + <<"@type">> => update, + doc_id => DocId, match_seq => MatchSeq, seq => UpdateSeq, partition => Partition, fields => Fields }, - Resp = send_if_enabled( - doc_path(Index, DocId), - [?JSON_CONTENT_TYPE], - <<"PUT">>, - jiffy:encode(ReqBody) - ), - case Resp of - {ok, 200, _, _} -> - ok; - {ok, StatusCode, _, RespBody} -> - {error, jaxrs_error(StatusCode, RespBody)}; - {error, Reason} -> - send_error(Reason) - end. + ok = gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)), + check_status(PoolStreamRef). search(#index{} = Index, QueryArgs) -> Resp = send_if_enabled( @@ -245,6 +241,9 @@ doc_path(#index{} = Index, DocId) -> search_path(#index{} = Index) -> [index_path(Index), <<"/search">>]. +update_path(#index{} = Index) -> + [index_path(Index), <<"/update">>]. + jaxrs_error(400, Body) -> {bad_request, message(Body)}; jaxrs_error(404, Body) -> @@ -314,3 +313,28 @@ send_if_enabled(Path, ReqHeaders, Method, ReqBody, RemainingTries) -> false -> {error, nouveau_not_enabled} end. + +encode_json_seq(Data) -> + [$\x{1e}, jiffy:encode(Data), $\n]. + +check_status({ConnPid, StreamRef} = PoolStreamRef) -> + MRef = monitor(process, ConnPid), + Res = receive + {gun_response, ConnPid, StreamRef, fin, Status, _Headers} -> + {error, Status}; + {gun_response, ConnPid, StreamRef, nofin, Status, _Headers} -> + case gun_pool:await_body(PoolStreamRef, MRef) of + {ok, Body} -> + {error, Status, Body}; + {ok, Body, _Trailers} -> + {error, Status, Body}; + {error, Reason} -> + {error, Reason} + end; + {'DOWN', MRef, process, ConnPid, Reason} -> + {error, Reason} + after 0 -> + ok + end, + demonitor(MRef, [flush]), + Res. diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl index 4bfea753a..a3bea3bbb 100644 --- a/src/nouveau/src/nouveau_index_updater.erl +++ b/src/nouveau/src/nouveau_index_updater.erl @@ -27,6 +27,7 @@ -import(nouveau_util, [index_path/1]). -record(acc, { + pool_stream_ref, db, index, proc, @@ -80,14 +81,15 @@ update(#index{} = Index) -> index_update_seq = IndexUpdateSeq, index_purge_seq = IndexPurgeSeq }, - {ok, PurgeAcc1} = purge_index(Db, Index, PurgeAcc0), - + {async, PoolStreamRef} = nouveau_api:start_update(Index), + {ok, PurgeAcc1} = purge_index(PoolStreamRef, Db, Index, PurgeAcc0), NewCurSeq = couch_db:get_update_seq(Db), Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), Acc0 = #acc{ + pool_stream_ref = PoolStreamRef, db = Db, index = Index, proc = Proc, @@ -101,6 +103,7 @@ update(#index{} = Index) -> ), exit(nouveau_api:set_update_seq(Index, Acc1#acc.update_seq, NewCurSeq)) after + nouveau_api:end_update(PoolStreamRef), ret_os_process(Proc) end end @@ -126,7 +129,7 @@ load_docs(FDI, #acc{} = Acc1) -> case update_or_delete_index( Acc1#acc.db, - Acc1#acc.index, + Acc1#acc.pool_stream_ref, Acc1#acc.update_seq, DI, Acc1#acc.proc @@ -142,11 +145,11 @@ load_docs(FDI, #acc{} = Acc1) -> end, {ok, Acc2#acc{changes_done = Acc2#acc.changes_done + 1}}. -update_or_delete_index(Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) -> +update_or_delete_index(Db, PoolStreamRef, MatchSeq, #doc_info{} = DI, Proc) -> #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI, case Del of true -> - nouveau_api:delete_doc(Index, Id, MatchSeq, Seq); + ok = nouveau_api:delete_doc(PoolStreamRef, Id, MatchSeq, Seq); false -> {ok, Doc} = couch_db:open_doc(Db, DI, []), Json = couch_doc:to_json_obj(Doc, []), @@ -160,10 +163,10 @@ update_or_delete_index(Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) - end, case Fields of [] -> - nouveau_api:delete_doc(Index, Id, MatchSeq, Seq); + ok = nouveau_api:delete_doc(PoolStreamRef, Id, MatchSeq, Seq); _ -> - nouveau_api:update_doc( - Index, Id, MatchSeq, Seq, Partition, Fields + ok = nouveau_api:update_doc( + PoolStreamRef, Id, MatchSeq, Seq, Partition, Fields ) end end. @@ -209,7 +212,7 @@ index_definition(#index{} = Index) -> <<"field_analyzers">> => Index#index.field_analyzers }. -purge_index(Db, Index, #purge_acc{} = PurgeAcc0) -> +purge_index(PoolStreamRef, Db, Index, #purge_acc{} = PurgeAcc0) -> Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), @@ -230,7 +233,7 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) -> false -> update_or_delete_index( Db, - Index, + PoolStreamRef, PurgeAcc1#purge_acc.index_update_seq, DI, Proc
