This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch nouveau-streaming-index-update
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 95a950048baafd75b62298da25a1db62993f07bf
Author: Robert Newson <[email protected]>
AuthorDate: Thu Feb 19 21:45:37 2026 +0000

    stream updates for performance
---
 .../apache/couchdb/nouveau/NouveauApplication.java |  2 +-
 .../couchdb/nouveau/api/DocumentDeleteRequest.java |  4 +-
 .../couchdb/nouveau/api/DocumentRequest.java       | 38 ++++++++++
 .../couchdb/nouveau/api/DocumentUpdateRequest.java |  4 +-
 .../couchdb/nouveau/health/IndexHealthCheck.java   |  2 +-
 .../couchdb/nouveau/resources/IndexResource.java   | 35 ++++++++-
 src/nouveau/src/nouveau_api.erl                    | 88 ++++++++++++++--------
 src/nouveau/src/nouveau_index_updater.erl          | 23 +++---
 8 files changed, 149 insertions(+), 47 deletions(-)

diff --git 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java
 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java
index c2230d1eb..eb886d826 100644
--- 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java
+++ 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java
@@ -77,7 +77,7 @@ public class NouveauApplication extends 
Application<NouveauApplicationConfigurat
         environment.jersey().register(analyzeResource);
 
         // IndexResource
-        final IndexResource indexResource = new IndexResource(indexManager);
+        final IndexResource indexResource = new IndexResource(indexManager, 
environment.getObjectMapper());
         environment.jersey().register(indexResource);
 
         // Health checks
diff --git 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
index 82e9b716a..ddc067a16 100644
--- 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
+++ 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
@@ -17,7 +17,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import jakarta.validation.constraints.Positive;
 import jakarta.validation.constraints.PositiveOrZero;
 
-public final class DocumentDeleteRequest {
+public final class DocumentDeleteRequest extends DocumentRequest {
 
     @PositiveOrZero
     private final long matchSeq;
@@ -28,9 +28,11 @@ public final class DocumentDeleteRequest {
     private final boolean purge;
 
     public DocumentDeleteRequest(
+            @JsonProperty("doc_id") final String id,
             @JsonProperty("match_seq") final long matchSeq,
             @JsonProperty("seq") final long seq,
             @JsonProperty("purge") final boolean purge) {
+        super(id);
         if (matchSeq < 0) {
             throw new IllegalArgumentException("matchSeq must be 0 or 
greater");
         }
diff --git 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentRequest.java
 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentRequest.java
new file mode 100644
index 000000000..05e60988a
--- /dev/null
+++ 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentRequest.java
@@ -0,0 +1,38 @@
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.couchdb.nouveau.api;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.PropertyNamingStrategies;
+import com.fasterxml.jackson.databind.annotation.JsonNaming;
+
+@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "@type")
+@JsonSubTypes({
+    @JsonSubTypes.Type(value = DocumentUpdateRequest.class, name = "update"),
+    @JsonSubTypes.Type(value = DocumentDeleteRequest.class, name = "delete"),
+})
+public abstract class DocumentRequest {
+
+    private final String id;
+
+    protected DocumentRequest(final String id) {
+        this.id = id;
+    }
+
+    public final String getId() {
+        return id;
+    }
+}
diff --git 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java
 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java
index 82c196602..7b1db0c09 100644
--- 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java
+++ 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java
@@ -20,7 +20,7 @@ import jakarta.validation.constraints.Positive;
 import jakarta.validation.constraints.PositiveOrZero;
 import java.util.Collection;
 
-public final class DocumentUpdateRequest {
+public final class DocumentUpdateRequest extends DocumentRequest {
 
     @PositiveOrZero
     private final long matchSeq;
@@ -35,10 +35,12 @@ public final class DocumentUpdateRequest {
     private final Collection<Field> fields;
 
     public DocumentUpdateRequest(
+            @JsonProperty("doc_id") final String id,
             @JsonProperty("match_seq") final long matchSeq,
             @JsonProperty("seq") final long seq,
             @JsonProperty("partition") final String partition,
             @JsonProperty("fields") final Collection<Field> fields) {
+        super(id);
         this.matchSeq = matchSeq;
         this.seq = seq;
         this.partition = partition;
diff --git 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java
 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java
index 7e5facb2e..3a70dd7a7 100644
--- 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java
+++ 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java
@@ -42,7 +42,7 @@ public final class IndexHealthCheck extends HealthCheck {
         indexResource.createIndex(name, new 
IndexDefinition(IndexDefinition.LATEST_LUCENE_VERSION, "standard", null));
         try {
             final DocumentUpdateRequest documentUpdateRequest =
-                    new DocumentUpdateRequest(0, 1, null, 
Collections.emptyList());
+                    new DocumentUpdateRequest("foo", 0, 1, null, 
Collections.emptyList());
             indexResource.updateDoc(name, "foo", documentUpdateRequest);
 
             final SearchRequest searchRequest = new SearchRequest();
diff --git 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
index 9ba382109..f59f29369 100644
--- 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
+++ 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
@@ -16,6 +16,8 @@ package org.apache.couchdb.nouveau.resources;
 import com.codahale.metrics.annotation.ExceptionMetered;
 import com.codahale.metrics.annotation.Metered;
 import com.codahale.metrics.annotation.ResponseMetered;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import jakarta.servlet.http.HttpServletRequest;
 import jakarta.validation.Valid;
 import jakarta.validation.constraints.NotNull;
 import jakarta.ws.rs.Consumes;
@@ -27,12 +29,14 @@ import jakarta.ws.rs.Path;
 import jakarta.ws.rs.PathParam;
 import jakarta.ws.rs.Produces;
 import jakarta.ws.rs.WebApplicationException;
+import jakarta.ws.rs.core.Context;
 import jakarta.ws.rs.core.MediaType;
 import jakarta.ws.rs.core.Response.Status;
 import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
 import org.apache.couchdb.nouveau.api.DocumentDeleteRequest;
+import org.apache.couchdb.nouveau.api.DocumentRequest;
 import org.apache.couchdb.nouveau.api.DocumentUpdateRequest;
 import org.apache.couchdb.nouveau.api.IndexDefinition;
 import org.apache.couchdb.nouveau.api.IndexInfo;
@@ -41,6 +45,8 @@ import org.apache.couchdb.nouveau.api.Ok;
 import org.apache.couchdb.nouveau.api.SearchRequest;
 import org.apache.couchdb.nouveau.api.SearchResults;
 import org.apache.couchdb.nouveau.core.IndexManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Path("/index/{name}")
 @Metered
@@ -50,10 +56,15 @@ import org.apache.couchdb.nouveau.core.IndexManager;
 @Produces(MediaType.APPLICATION_JSON)
 public final class IndexResource {
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(IndexResource.class);
+
     private final IndexManager indexManager;
 
-    public IndexResource(final IndexManager indexManager) {
+    private final ObjectMapper objectMapper;
+
+    public IndexResource(final IndexManager indexManager, final ObjectMapper 
objectMapper) {
         this.indexManager = Objects.requireNonNull(indexManager);
+        this.objectMapper = Objects.requireNonNull(objectMapper);
     }
 
     @PUT
@@ -67,6 +78,7 @@ public final class IndexResource {
         return Ok.INSTANCE;
     }
 
+    @Deprecated(since = "2.5.2", forRemoval = true)
     @DELETE
     @Path("/doc/{docId}")
     public Ok deleteDoc(
@@ -120,6 +132,7 @@ public final class IndexResource {
         });
     }
 
+    @Deprecated(since = "2.5.2", forRemoval = true)
     @PUT
     @Path("/doc/{docId}")
     public Ok updateDoc(
@@ -132,4 +145,24 @@ public final class IndexResource {
             return Ok.INSTANCE;
         });
     }
+
+    @POST
+    @Path("/update")
+    @Consumes({"application/json-seq"})
+    public Ok updates(@PathParam("name") String name, @Context 
HttpServletRequest req) throws Exception {
+        var reader = req.getReader();
+        return indexManager.with(name, (index) -> {
+            String line;
+            while ((line = reader.readLine()) != null) {
+                var docReq = objectMapper.readValue(line.trim(), 
DocumentRequest.class);
+                if (docReq instanceof DocumentUpdateRequest) {
+                    index.update(docReq.getId(), (DocumentUpdateRequest) 
docReq);
+                }
+                if (docReq instanceof DocumentDeleteRequest) {
+                    index.delete(docReq.getId(), (DocumentDeleteRequest) 
docReq);
+                }
+            }
+            return Ok.INSTANCE;
+        });
+    }
 }
diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl
index 2d140e580..741e6936c 100644
--- a/src/nouveau/src/nouveau_api.erl
+++ b/src/nouveau/src/nouveau_api.erl
@@ -26,6 +26,8 @@
     delete_doc/4,
     purge_doc/4,
     update_doc/6,
+    start_update/1,
+    end_update/1,
     search/2,
     set_purge_seq/3,
     set_update_seq/3,
@@ -34,6 +36,7 @@
 ]).
 
 -define(JSON_CONTENT_TYPE, {"Content-Type", "application/json"}).
+-define(JSON_SEQ_CONTENT_TYPE, {"Content-Type", "application/json-seq"}).
 
 analyze(Text, Analyzer) when
     is_binary(Text), is_binary(Analyzer)
@@ -99,28 +102,16 @@ delete_path(Path, Exclusions) when
             send_error(Reason)
     end.
 
-delete_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq) when
+delete_doc({_, _} = PoolStreamRef, DocId, MatchSeq, UpdateSeq) when
     is_binary(DocId),
     is_integer(MatchSeq),
     MatchSeq >= 0,
     is_integer(UpdateSeq),
     UpdateSeq > 0
 ->
-    ReqBody = #{match_seq => MatchSeq, seq => UpdateSeq, purge => false},
-    Resp = send_if_enabled(
-        doc_path(Index, DocId),
-        [?JSON_CONTENT_TYPE],
-        <<"DELETE">>,
-        jiffy:encode(ReqBody)
-    ),
-    case Resp of
-        {ok, 200, _, _} ->
-            ok;
-        {ok, StatusCode, _, RespBody} ->
-            {error, jaxrs_error(StatusCode, RespBody)};
-        {error, Reason} ->
-            send_error(Reason)
-    end.
+    Row = #{<<"@type">> => delete, doc_id => DocId, match_seq => MatchSeq, seq 
=> UpdateSeq, purge => false},
+    ok = gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)),
+    check_status(PoolStreamRef).
 
 purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when
     is_binary(DocId),
@@ -142,7 +133,22 @@ purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when
             send_error(Reason)
     end.
 
-update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) 
when
+start_update(#index{} = Index) ->
+    case nouveau:enabled() of
+        true ->
+            gun_pool:post(
+                update_path(Index),
+                [nouveau_gun:host_header(), ?JSON_SEQ_CONTENT_TYPE]
+            );
+        false ->
+            {error, nouveau_not_enabled}
+    end.
+
+end_update({_, _} = PoolStreamRef) ->
+    ok = gun_pool:data(PoolStreamRef, fin, <<>>),
+    check_status(PoolStreamRef).
+
+update_doc({_, _} = PoolStreamRef, DocId, MatchSeq, UpdateSeq, Partition, 
Fields) when
     is_binary(DocId),
     is_integer(MatchSeq),
     MatchSeq >= 0,
@@ -151,26 +157,16 @@ update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, 
Partition, Fields) when
     (is_binary(Partition) orelse Partition == null),
     is_list(Fields)
 ->
-    ReqBody = #{
+    Row = #{
+        <<"@type">> => update,
+        doc_id => DocId,
         match_seq => MatchSeq,
         seq => UpdateSeq,
         partition => Partition,
         fields => Fields
     },
-    Resp = send_if_enabled(
-        doc_path(Index, DocId),
-        [?JSON_CONTENT_TYPE],
-        <<"PUT">>,
-        jiffy:encode(ReqBody)
-    ),
-    case Resp of
-        {ok, 200, _, _} ->
-            ok;
-        {ok, StatusCode, _, RespBody} ->
-            {error, jaxrs_error(StatusCode, RespBody)};
-        {error, Reason} ->
-            send_error(Reason)
-    end.
+    ok = gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)),
+    check_status(PoolStreamRef).
 
 search(#index{} = Index, QueryArgs) ->
     Resp = send_if_enabled(
@@ -245,6 +241,9 @@ doc_path(#index{} = Index, DocId) ->
 search_path(#index{} = Index) ->
     [index_path(Index), <<"/search">>].
 
+update_path(#index{} = Index) ->
+    [index_path(Index), <<"/update">>].
+
 jaxrs_error(400, Body) ->
     {bad_request, message(Body)};
 jaxrs_error(404, Body) ->
@@ -314,3 +313,28 @@ send_if_enabled(Path, ReqHeaders, Method, ReqBody, 
RemainingTries) ->
         false ->
             {error, nouveau_not_enabled}
     end.
+
+encode_json_seq(Data) ->
+    [$\x{1e}, jiffy:encode(Data), $\n].
+
+check_status({ConnPid, StreamRef} = PoolStreamRef) ->
+       MRef = monitor(process, ConnPid),
+    Res = receive
+        {gun_response, ConnPid, StreamRef, fin, Status, _Headers} ->
+            {error, Status};
+        {gun_response, ConnPid, StreamRef, nofin, Status, _Headers} ->
+            case gun_pool:await_body(PoolStreamRef, MRef) of
+                {ok, Body} ->
+                    {error, Status, Body};
+                {ok, Body, _Trailers} ->
+                    {error, Status, Body};
+                {error, Reason} ->
+                    {error, Reason}
+            end;
+        {'DOWN', MRef, process, ConnPid, Reason} ->
+            {error, Reason}
+    after 0 ->
+            ok
+    end,
+    demonitor(MRef, [flush]),
+    Res.
diff --git a/src/nouveau/src/nouveau_index_updater.erl 
b/src/nouveau/src/nouveau_index_updater.erl
index 4bfea753a..a3bea3bbb 100644
--- a/src/nouveau/src/nouveau_index_updater.erl
+++ b/src/nouveau/src/nouveau_index_updater.erl
@@ -27,6 +27,7 @@
 -import(nouveau_util, [index_path/1]).
 
 -record(acc, {
+    pool_stream_ref,
     db,
     index,
     proc,
@@ -80,14 +81,15 @@ update(#index{} = Index) ->
                     index_update_seq = IndexUpdateSeq,
                     index_purge_seq = IndexPurgeSeq
                 },
-                {ok, PurgeAcc1} = purge_index(Db, Index, PurgeAcc0),
-
+                {async, PoolStreamRef} = nouveau_api:start_update(Index),
+                {ok, PurgeAcc1} = purge_index(PoolStreamRef, Db, Index, 
PurgeAcc0),
                 NewCurSeq = couch_db:get_update_seq(Db),
                 Proc = get_os_process(Index#index.def_lang),
                 try
                     true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, 
<<"nouveau">>]),
 
                     Acc0 = #acc{
+                        pool_stream_ref = PoolStreamRef,
                         db = Db,
                         index = Index,
                         proc = Proc,
@@ -101,6 +103,7 @@ update(#index{} = Index) ->
                     ),
                     exit(nouveau_api:set_update_seq(Index, 
Acc1#acc.update_seq, NewCurSeq))
                 after
+                    nouveau_api:end_update(PoolStreamRef),
                     ret_os_process(Proc)
                 end
         end
@@ -126,7 +129,7 @@ load_docs(FDI, #acc{} = Acc1) ->
                 case
                     update_or_delete_index(
                         Acc1#acc.db,
-                        Acc1#acc.index,
+                        Acc1#acc.pool_stream_ref,
                         Acc1#acc.update_seq,
                         DI,
                         Acc1#acc.proc
@@ -142,11 +145,11 @@ load_docs(FDI, #acc{} = Acc1) ->
         end,
     {ok, Acc2#acc{changes_done = Acc2#acc.changes_done + 1}}.
 
-update_or_delete_index(Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) 
->
+update_or_delete_index(Db, PoolStreamRef, MatchSeq, #doc_info{} = DI, Proc) ->
     #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} 
= DI,
     case Del of
         true ->
-            nouveau_api:delete_doc(Index, Id, MatchSeq, Seq);
+            ok = nouveau_api:delete_doc(PoolStreamRef, Id, MatchSeq, Seq);
         false ->
             {ok, Doc} = couch_db:open_doc(Db, DI, []),
             Json = couch_doc:to_json_obj(Doc, []),
@@ -160,10 +163,10 @@ update_or_delete_index(Db, #index{} = Index, MatchSeq, 
#doc_info{} = DI, Proc) -
                 end,
             case Fields of
                 [] ->
-                    nouveau_api:delete_doc(Index, Id, MatchSeq, Seq);
+                    ok = nouveau_api:delete_doc(PoolStreamRef, Id, MatchSeq, 
Seq);
                 _ ->
-                    nouveau_api:update_doc(
-                        Index, Id, MatchSeq, Seq, Partition, Fields
+                    ok = nouveau_api:update_doc(
+                        PoolStreamRef, Id, MatchSeq, Seq, Partition, Fields
                     )
             end
     end.
@@ -209,7 +212,7 @@ index_definition(#index{} = Index) ->
         <<"field_analyzers">> => Index#index.field_analyzers
     }.
 
-purge_index(Db, Index, #purge_acc{} = PurgeAcc0) ->
+purge_index(PoolStreamRef, Db, Index, #purge_acc{} = PurgeAcc0) ->
     Proc = get_os_process(Index#index.def_lang),
     try
         true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, 
<<"nouveau">>]),
@@ -230,7 +233,7 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) ->
                             false ->
                                 update_or_delete_index(
                                     Db,
-                                    Index,
+                                    PoolStreamRef,
                                     PurgeAcc1#purge_acc.index_update_seq,
                                     DI,
                                     Proc

Reply via email to