This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch nouveau-streaming-index-update
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 3bf58b293014c1d11d0dcb70a13ef36a545fe937
Author: Robert Newson <[email protected]>
AuthorDate: Thu Feb 19 21:45:37 2026 +0000

    WIP stream updates for performance
---
 .../couchdb/nouveau/resources/IndexResource.java   | 21 ++++++++
 src/nouveau/src/nouveau_api.erl                    | 62 +++++++++++-----------
 src/nouveau/src/nouveau_index_updater.erl          | 21 ++++----
 3 files changed, 63 insertions(+), 41 deletions(-)

diff --git 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
index 9ba382109..496f5e6f6 100644
--- 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
+++ 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
@@ -16,6 +16,7 @@ package org.apache.couchdb.nouveau.resources;
 import com.codahale.metrics.annotation.ExceptionMetered;
 import com.codahale.metrics.annotation.Metered;
 import com.codahale.metrics.annotation.ResponseMetered;
+import jakarta.servlet.http.HttpServletRequest;
 import jakarta.validation.Valid;
 import jakarta.validation.constraints.NotNull;
 import jakarta.ws.rs.Consumes;
@@ -27,6 +28,7 @@ import jakarta.ws.rs.Path;
 import jakarta.ws.rs.PathParam;
 import jakarta.ws.rs.Produces;
 import jakarta.ws.rs.WebApplicationException;
+import jakarta.ws.rs.core.Context;
 import jakarta.ws.rs.core.MediaType;
 import jakarta.ws.rs.core.Response.Status;
 import java.io.IOException;
@@ -41,6 +43,8 @@ import org.apache.couchdb.nouveau.api.Ok;
 import org.apache.couchdb.nouveau.api.SearchRequest;
 import org.apache.couchdb.nouveau.api.SearchResults;
 import org.apache.couchdb.nouveau.core.IndexManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Path("/index/{name}")
 @Metered
@@ -50,6 +54,8 @@ import org.apache.couchdb.nouveau.core.IndexManager;
 @Produces(MediaType.APPLICATION_JSON)
 public final class IndexResource {
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(IndexResource.class);
+
     private final IndexManager indexManager;
 
     public IndexResource(final IndexManager indexManager) {
@@ -67,6 +73,7 @@ public final class IndexResource {
         return Ok.INSTANCE;
     }
 
+    @Deprecated(since = "2.5.2", forRemoval = true)
     @DELETE
     @Path("/doc/{docId}")
     public Ok deleteDoc(
@@ -120,6 +127,7 @@ public final class IndexResource {
         });
     }
 
+    @Deprecated(since = "2.5.2", forRemoval = true)
     @PUT
     @Path("/doc/{docId}")
     public Ok updateDoc(
@@ -132,4 +140,17 @@ public final class IndexResource {
             return Ok.INSTANCE;
         });
     }
+
+    @POST
+    @Path("/update")
+    @Consumes({"application/json-seq"})
+    public Ok updates(@PathParam("name") String name, @Context 
HttpServletRequest req) throws Exception {
+        LOGGER.info("updates for {}", name);
+        var reader = req.getReader();
+        String line;
+        while ((line = reader.readLine()) != null) {
+            LOGGER.info("line: {}", line);
+        }
+        return Ok.INSTANCE;
+    }
 }
diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl
index 2d140e580..f4bfe580f 100644
--- a/src/nouveau/src/nouveau_api.erl
+++ b/src/nouveau/src/nouveau_api.erl
@@ -26,6 +26,8 @@
     delete_doc/4,
     purge_doc/4,
     update_doc/6,
+    start_update/1,
+    end_update/1,
     search/2,
     set_purge_seq/3,
     set_update_seq/3,
@@ -34,6 +36,7 @@
 ]).
 
 -define(JSON_CONTENT_TYPE, {"Content-Type", "application/json"}).
+-define(JSON_SEQ_CONTENT_TYPE, {"Content-Type", "application/json-seq"}).
 
 analyze(Text, Analyzer) when
     is_binary(Text), is_binary(Analyzer)
@@ -99,28 +102,15 @@ delete_path(Path, Exclusions) when
             send_error(Reason)
     end.
 
-delete_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq) when
+delete_doc({_, _} = PoolStreamRef, DocId, MatchSeq, UpdateSeq) when
     is_binary(DocId),
     is_integer(MatchSeq),
     MatchSeq >= 0,
     is_integer(UpdateSeq),
     UpdateSeq > 0
 ->
-    ReqBody = #{match_seq => MatchSeq, seq => UpdateSeq, purge => false},
-    Resp = send_if_enabled(
-        doc_path(Index, DocId),
-        [?JSON_CONTENT_TYPE],
-        <<"DELETE">>,
-        jiffy:encode(ReqBody)
-    ),
-    case Resp of
-        {ok, 200, _, _} ->
-            ok;
-        {ok, StatusCode, _, RespBody} ->
-            {error, jaxrs_error(StatusCode, RespBody)};
-        {error, Reason} ->
-            send_error(Reason)
-    end.
+    Row = #{doc_id => DocId, match_seq => MatchSeq, seq => UpdateSeq, purge => 
false},
+    gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)).
 
 purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when
     is_binary(DocId),
@@ -142,7 +132,21 @@ purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when
             send_error(Reason)
     end.
 
-update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) 
when
+start_update(#index{} = Index) ->
+    case nouveau:enabled() of
+        true ->
+            gun_pool:post(
+                update_path(Index),
+                [nouveau_gun:host_header(), ?JSON_SEQ_CONTENT_TYPE]
+            );
+        false ->
+            {error, nouveau_not_enabled}
+    end.
+
+end_update({_, _} = PoolStreamRef) ->
+    gun_pool:data(PoolStreamRef, fin, <<>>).
+
+update_doc({_, _} = PoolStreamRef, DocId, MatchSeq, UpdateSeq, Partition, 
Fields) when
     is_binary(DocId),
     is_integer(MatchSeq),
     MatchSeq >= 0,
@@ -151,26 +155,14 @@ update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, 
Partition, Fields) when
     (is_binary(Partition) orelse Partition == null),
     is_list(Fields)
 ->
-    ReqBody = #{
+    Row = #{
+        doc_id => DocId,
         match_seq => MatchSeq,
         seq => UpdateSeq,
         partition => Partition,
         fields => Fields
     },
-    Resp = send_if_enabled(
-        doc_path(Index, DocId),
-        [?JSON_CONTENT_TYPE],
-        <<"PUT">>,
-        jiffy:encode(ReqBody)
-    ),
-    case Resp of
-        {ok, 200, _, _} ->
-            ok;
-        {ok, StatusCode, _, RespBody} ->
-            {error, jaxrs_error(StatusCode, RespBody)};
-        {error, Reason} ->
-            send_error(Reason)
-    end.
+    gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)).
 
 search(#index{} = Index, QueryArgs) ->
     Resp = send_if_enabled(
@@ -245,6 +237,9 @@ doc_path(#index{} = Index, DocId) ->
 search_path(#index{} = Index) ->
     [index_path(Index), <<"/search">>].
 
+update_path(#index{} = Index) ->
+    [index_path(Index), <<"/update">>].
+
 jaxrs_error(400, Body) ->
     {bad_request, message(Body)};
 jaxrs_error(404, Body) ->
@@ -314,3 +309,6 @@ send_if_enabled(Path, ReqHeaders, Method, ReqBody, 
RemainingTries) ->
         false ->
             {error, nouveau_not_enabled}
     end.
+
+encode_json_seq(Data) ->
+    [$\x{1e}, jiffy:encode(Data), $\n].
diff --git a/src/nouveau/src/nouveau_index_updater.erl 
b/src/nouveau/src/nouveau_index_updater.erl
index 4bfea753a..bf2ed1998 100644
--- a/src/nouveau/src/nouveau_index_updater.erl
+++ b/src/nouveau/src/nouveau_index_updater.erl
@@ -27,6 +27,7 @@
 -import(nouveau_util, [index_path/1]).
 
 -record(acc, {
+    pool_stream_ref,
     db,
     index,
     proc,
@@ -80,14 +81,15 @@ update(#index{} = Index) ->
                     index_update_seq = IndexUpdateSeq,
                     index_purge_seq = IndexPurgeSeq
                 },
-                {ok, PurgeAcc1} = purge_index(Db, Index, PurgeAcc0),
-
+                {async, PoolStreamRef} = nouveau_api:start_update(Index),
+                {ok, PurgeAcc1} = purge_index(PoolStreamRef, Db, Index, 
PurgeAcc0),
                 NewCurSeq = couch_db:get_update_seq(Db),
                 Proc = get_os_process(Index#index.def_lang),
                 try
                     true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, 
<<"nouveau">>]),
 
                     Acc0 = #acc{
+                        pool_stream_ref = PoolStreamRef,
                         db = Db,
                         index = Index,
                         proc = Proc,
@@ -101,6 +103,7 @@ update(#index{} = Index) ->
                     ),
                     exit(nouveau_api:set_update_seq(Index, 
Acc1#acc.update_seq, NewCurSeq))
                 after
+                    nouveau_api:end_update(PoolStreamRef),
                     ret_os_process(Proc)
                 end
         end
@@ -126,7 +129,7 @@ load_docs(FDI, #acc{} = Acc1) ->
                 case
                     update_or_delete_index(
                         Acc1#acc.db,
-                        Acc1#acc.index,
+                        Acc1#acc.pool_stream_ref,
                         Acc1#acc.update_seq,
                         DI,
                         Acc1#acc.proc
@@ -142,11 +145,11 @@ load_docs(FDI, #acc{} = Acc1) ->
         end,
     {ok, Acc2#acc{changes_done = Acc2#acc.changes_done + 1}}.
 
-update_or_delete_index(Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) 
->
+update_or_delete_index(Db, PoolStreamRef, MatchSeq, #doc_info{} = DI, Proc) ->
     #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} 
= DI,
     case Del of
         true ->
-            nouveau_api:delete_doc(Index, Id, MatchSeq, Seq);
+            nouveau_api:delete_doc(PoolStreamRef, Id, MatchSeq, Seq);
         false ->
             {ok, Doc} = couch_db:open_doc(Db, DI, []),
             Json = couch_doc:to_json_obj(Doc, []),
@@ -160,10 +163,10 @@ update_or_delete_index(Db, #index{} = Index, MatchSeq, 
#doc_info{} = DI, Proc) -
                 end,
             case Fields of
                 [] ->
-                    nouveau_api:delete_doc(Index, Id, MatchSeq, Seq);
+                    nouveau_api:delete_doc(PoolStreamRef, Id, MatchSeq, Seq);
                 _ ->
                     nouveau_api:update_doc(
-                        Index, Id, MatchSeq, Seq, Partition, Fields
+                        PoolStreamRef, Id, MatchSeq, Seq, Partition, Fields
                     )
             end
     end.
@@ -209,7 +212,7 @@ index_definition(#index{} = Index) ->
         <<"field_analyzers">> => Index#index.field_analyzers
     }.
 
-purge_index(Db, Index, #purge_acc{} = PurgeAcc0) ->
+purge_index(PoolStreamRef, Db, Index, #purge_acc{} = PurgeAcc0) ->
     Proc = get_os_process(Index#index.def_lang),
     try
         true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, 
<<"nouveau">>]),
@@ -230,7 +233,7 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) ->
                             false ->
                                 update_or_delete_index(
                                     Db,
-                                    Index,
+                                    PoolStreamRef,
                                     PurgeAcc1#purge_acc.index_update_seq,
                                     DI,
                                     Proc

Reply via email to