This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch nouveau-streaming-index-update-alt
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 5cabfef0216df1c5c61deeffb5ba880012bc911c
Author: Robert Newson <[email protected]>
AuthorDate: Thu Feb 19 21:45:37 2026 +0000

    stream index updates in one request for performance
---
 .../apache/couchdb/nouveau/NouveauApplication.java |   2 +-
 .../couchdb/nouveau/api/DocumentDeleteRequest.java |   4 +-
 .../couchdb/nouveau/api/DocumentRequest.java       |  27 +++
 .../couchdb/nouveau/api/DocumentUpdateRequest.java |   4 +-
 .../couchdb/nouveau/api/IndexInfoRequest.java      |   2 +-
 .../apache/couchdb/nouveau/api/UpdateRequest.java  |  28 ++++
 .../couchdb/nouveau/health/IndexHealthCheck.java   |   2 +-
 .../couchdb/nouveau/resources/IndexResource.java   |  55 +++++-
 .../nouveau/health/IndexHealthCheckTest.java       |   5 +-
 .../couchdb/nouveau/lucene/LuceneIndexTest.java    |  28 ++--
 src/nouveau/src/nouveau_api.erl                    | 184 ++++++++++++---------
 src/nouveau/src/nouveau_index_updater.erl          |  28 ++--
 12 files changed, 256 insertions(+), 113 deletions(-)

diff --git 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java
 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java
index c2230d1eb..eb886d826 100644
--- 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java
+++ 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java
@@ -77,7 +77,7 @@ public class NouveauApplication extends 
Application<NouveauApplicationConfigurat
         environment.jersey().register(analyzeResource);
 
         // IndexResource
-        final IndexResource indexResource = new IndexResource(indexManager);
+        final IndexResource indexResource = new IndexResource(indexManager, 
environment.getObjectMapper());
         environment.jersey().register(indexResource);
 
         // Health checks
diff --git 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
index 82e9b716a..ddc067a16 100644
--- 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
+++ 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
@@ -17,7 +17,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import jakarta.validation.constraints.Positive;
 import jakarta.validation.constraints.PositiveOrZero;
 
-public final class DocumentDeleteRequest {
+public final class DocumentDeleteRequest extends DocumentRequest {
 
     @PositiveOrZero
     private final long matchSeq;
@@ -28,9 +28,11 @@ public final class DocumentDeleteRequest {
     private final boolean purge;
 
     public DocumentDeleteRequest(
+            @JsonProperty("doc_id") final String id,
             @JsonProperty("match_seq") final long matchSeq,
             @JsonProperty("seq") final long seq,
             @JsonProperty("purge") final boolean purge) {
+        super(id);
         if (matchSeq < 0) {
             throw new IllegalArgumentException("matchSeq must be 0 or 
greater");
         }
diff --git 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentRequest.java
 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentRequest.java
new file mode 100644
index 000000000..4301b5b10
--- /dev/null
+++ 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentRequest.java
@@ -0,0 +1,27 @@
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.couchdb.nouveau.api;
+
+public abstract class DocumentRequest extends UpdateRequest {
+
+    private final String id;
+
+    protected DocumentRequest(final String id) {
+        this.id = id;
+    }
+
+    public final String getId() {
+        return id;
+    }
+}
diff --git 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java
 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java
index 82c196602..7b1db0c09 100644
--- 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java
+++ 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java
@@ -20,7 +20,7 @@ import jakarta.validation.constraints.Positive;
 import jakarta.validation.constraints.PositiveOrZero;
 import java.util.Collection;
 
-public final class DocumentUpdateRequest {
+public final class DocumentUpdateRequest extends DocumentRequest {
 
     @PositiveOrZero
     private final long matchSeq;
@@ -35,10 +35,12 @@ public final class DocumentUpdateRequest {
     private final Collection<Field> fields;
 
     public DocumentUpdateRequest(
+            @JsonProperty("doc_id") final String id,
             @JsonProperty("match_seq") final long matchSeq,
             @JsonProperty("seq") final long seq,
             @JsonProperty("partition") final String partition,
             @JsonProperty("fields") final Collection<Field> fields) {
+        super(id);
         this.matchSeq = matchSeq;
         this.seq = seq;
         this.partition = partition;
diff --git 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java
 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java
index cc008231c..538e664b9 100644
--- 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java
+++ 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java
@@ -17,7 +17,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import jakarta.validation.constraints.Positive;
 import java.util.OptionalLong;
 
-public final class IndexInfoRequest {
+public final class IndexInfoRequest extends UpdateRequest {
 
     private final OptionalLong matchUpdateSeq;
 
diff --git 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/UpdateRequest.java 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/UpdateRequest.java
new file mode 100644
index 000000000..2ae5deb7f
--- /dev/null
+++ 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/UpdateRequest.java
@@ -0,0 +1,28 @@
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.couchdb.nouveau.api;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.PropertyNamingStrategies;
+import com.fasterxml.jackson.databind.annotation.JsonNaming;
+
+@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "@type")
+@JsonSubTypes({
+    @JsonSubTypes.Type(value = DocumentUpdateRequest.class, name = "update"),
+    @JsonSubTypes.Type(value = DocumentDeleteRequest.class, name = "delete"),
+    @JsonSubTypes.Type(value = IndexInfoRequest.class, name = "index_info"),
+})
+public abstract class UpdateRequest {}
diff --git 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java
 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java
index 7e5facb2e..3a70dd7a7 100644
--- 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java
+++ 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java
@@ -42,7 +42,7 @@ public final class IndexHealthCheck extends HealthCheck {
         indexResource.createIndex(name, new 
IndexDefinition(IndexDefinition.LATEST_LUCENE_VERSION, "standard", null));
         try {
             final DocumentUpdateRequest documentUpdateRequest =
-                    new DocumentUpdateRequest(0, 1, null, 
Collections.emptyList());
+                    new DocumentUpdateRequest("foo", 0, 1, null, 
Collections.emptyList());
             indexResource.updateDoc(name, "foo", documentUpdateRequest);
 
             final SearchRequest searchRequest = new SearchRequest();
diff --git 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
index 9ba382109..9aa727cd2 100644
--- 
a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
+++ 
b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
@@ -16,6 +16,8 @@ package org.apache.couchdb.nouveau.resources;
 import com.codahale.metrics.annotation.ExceptionMetered;
 import com.codahale.metrics.annotation.Metered;
 import com.codahale.metrics.annotation.ResponseMetered;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import jakarta.servlet.http.HttpServletRequest;
 import jakarta.validation.Valid;
 import jakarta.validation.constraints.NotNull;
 import jakarta.ws.rs.Consumes;
@@ -27,6 +29,7 @@ import jakarta.ws.rs.Path;
 import jakarta.ws.rs.PathParam;
 import jakarta.ws.rs.Produces;
 import jakarta.ws.rs.WebApplicationException;
+import jakarta.ws.rs.core.Context;
 import jakarta.ws.rs.core.MediaType;
 import jakarta.ws.rs.core.Response.Status;
 import java.io.IOException;
@@ -40,7 +43,10 @@ import org.apache.couchdb.nouveau.api.IndexInfoRequest;
 import org.apache.couchdb.nouveau.api.Ok;
 import org.apache.couchdb.nouveau.api.SearchRequest;
 import org.apache.couchdb.nouveau.api.SearchResults;
+import org.apache.couchdb.nouveau.api.UpdateRequest;
 import org.apache.couchdb.nouveau.core.IndexManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Path("/index/{name}")
 @Metered
@@ -50,10 +56,15 @@ import org.apache.couchdb.nouveau.core.IndexManager;
 @Produces(MediaType.APPLICATION_JSON)
 public final class IndexResource {
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(IndexResource.class);
+
     private final IndexManager indexManager;
 
-    public IndexResource(final IndexManager indexManager) {
+    private final ObjectMapper objectMapper;
+
+    public IndexResource(final IndexManager indexManager, final ObjectMapper 
objectMapper) {
         this.indexManager = Objects.requireNonNull(indexManager);
+        this.objectMapper = Objects.requireNonNull(objectMapper);
     }
 
     @PUT
@@ -67,6 +78,7 @@ public final class IndexResource {
         return Ok.INSTANCE;
     }
 
+    @Deprecated(since = "2.5.2", forRemoval = true)
     @DELETE
     @Path("/doc/{docId}")
     public Ok deleteDoc(
@@ -120,6 +132,7 @@ public final class IndexResource {
         });
     }
 
+    @Deprecated(since = "2.5.2", forRemoval = true)
     @PUT
     @Path("/doc/{docId}")
     public Ok updateDoc(
@@ -132,4 +145,44 @@ public final class IndexResource {
             return Ok.INSTANCE;
         });
     }
+
+    @POST
+    @Path("/update")
+    @Consumes({"application/json-seq"})
+    public Ok updates(@PathParam("name") String name, @Context 
HttpServletRequest req) throws Exception {
+        var reader = req.getReader();
+        return indexManager.with(name, (index) -> {
+            String line;
+            while ((line = reader.readLine()) != null) {
+                if (line.charAt(0) != 30) {
+                    throw new WebApplicationException("malformed row", 
Status.BAD_REQUEST);
+                }
+                var updateReq = objectMapper.readValue(line.substring(1), 
UpdateRequest.class);
+                if (updateReq instanceof DocumentUpdateRequest) {
+                    var documentUpdateRequest = (DocumentUpdateRequest) 
updateReq;
+                    index.update(documentUpdateRequest.getId(), 
documentUpdateRequest);
+                }
+                if (updateReq instanceof DocumentDeleteRequest) {
+                    var documentDeleteRequest = (DocumentDeleteRequest) 
updateReq;
+                    index.delete(documentDeleteRequest.getId(), 
documentDeleteRequest);
+                }
+                if (updateReq instanceof IndexInfoRequest) {
+                    var indexInfoRequest = (IndexInfoRequest) updateReq;
+                    if (indexInfoRequest.getMatchUpdateSeq().isPresent()
+                            && indexInfoRequest.getUpdateSeq().isPresent()) {
+                        index.setUpdateSeq(
+                                
indexInfoRequest.getMatchUpdateSeq().getAsLong(),
+                                indexInfoRequest.getUpdateSeq().getAsLong());
+                    }
+                    if (indexInfoRequest.getMatchPurgeSeq().isPresent()
+                            && indexInfoRequest.getPurgeSeq().isPresent()) {
+                        index.setPurgeSeq(
+                                
indexInfoRequest.getMatchPurgeSeq().getAsLong(),
+                                indexInfoRequest.getPurgeSeq().getAsLong());
+                    }
+                }
+            }
+            return Ok.INSTANCE;
+        });
+    }
 }
diff --git 
a/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java
 
b/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java
index 0c777fbab..77f47a52c 100644
--- 
a/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java
+++ 
b/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java
@@ -29,17 +29,18 @@ public class IndexHealthCheckTest {
     @Test
     public void testIndexHealthCheck(@TempDir final Path tempDir) throws 
Exception {
         var manager = new IndexManager();
+        var objectMapper = new ObjectMapper();
         manager.setCommitIntervalSeconds(30);
         manager.setIdleSeconds(60);
         manager.setMaxIndexesOpen(1);
-        manager.setObjectMapper(new ObjectMapper());
+        manager.setObjectMapper(objectMapper);
         manager.setRootDir(tempDir);
         
manager.setScheduledExecutorService(Executors.newScheduledThreadPool(2));
         manager.setSearcherFactory(new SearcherFactory());
         manager.start();
 
         try {
-            var resource = new IndexResource(manager);
+            var resource = new IndexResource(manager, objectMapper);
             var check = new IndexHealthCheck(resource);
             var result = check.check();
             assertTrue(result.isHealthy(), result.toString());
diff --git 
a/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java
 
b/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java
index f87af2fe0..2d56b14e9 100644
--- 
a/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java
+++ 
b/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java
@@ -77,7 +77,7 @@ public class LuceneIndexTest {
             final int count = 100;
             for (int i = 1; i <= count; i++) {
                 final Collection<Field> fields = List.of(new 
StringField("foo", "bar", false));
-                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i - 1, i, null, fields);
+                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(null, i - 1, i, null, fields);
                 index.update("doc" + i, request);
             }
             final SearchRequest request = new SearchRequest();
@@ -97,7 +97,7 @@ public class LuceneIndexTest {
             final int count = 100;
             for (int i = 1; i <= count; i++) {
                 final Collection<Field> fields = List.of(new 
StringField("foo", "bar", false));
-                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i - 1, i, null, fields);
+                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(null, i - 1, i, null, fields);
                 index.update("doc" + i, request);
             }
             final SearchRequest request = new SearchRequest();
@@ -118,7 +118,7 @@ public class LuceneIndexTest {
             final int count = 100;
             for (int i = 1; i <= count; i++) {
                 final Collection<Field> fields = List.of(new 
StringField("bar", "baz", false));
-                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i - 1, i, null, fields);
+                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(null, i - 1, i, null, fields);
                 index.update("doc" + i, request);
             }
             final SearchRequest request = new SearchRequest();
@@ -139,7 +139,7 @@ public class LuceneIndexTest {
             final int count = 100;
             for (int i = 1; i <= count; i++) {
                 final Collection<Field> fields = List.of(new 
DoubleField("bar", (double) i, false));
-                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i - 1, i, null, fields);
+                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(null, i - 1, i, null, fields);
                 index.update("doc" + i, request);
             }
             final SearchRequest request = new SearchRequest();
@@ -164,13 +164,13 @@ public class LuceneIndexTest {
             final int count = 50;
             for (int i = 1; i <= count; i++) {
                 final Collection<Field> fields = List.of(new 
StringField("bar", "bar", false));
-                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i - 1, i, null, fields);
+                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(null, i - 1, i, null, fields);
                 index.update("doc" + i, request);
             }
 
             for (int i = count + 1; i <= (count * 2) + 5; i++) {
                 final Collection<Field> fields = List.of(new 
StringField("bar", "baz", false));
-                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i - 1, i, null, fields);
+                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(null, i - 1, i, null, fields);
                 index.update("doc" + i, request);
             }
 
@@ -195,15 +195,15 @@ public class LuceneIndexTest {
             // get match seq wrong
             assertThrows(
                     UpdatesOutOfOrderException.class,
-                    () -> index.update("foo", new DocumentUpdateRequest(1, 2, 
null, fields)));
+                    () -> index.update("foo", new DocumentUpdateRequest(null, 
1, 2, null, fields)));
 
             // Go to 2.
-            index.update("foo", new DocumentUpdateRequest(0, 2, null, fields));
+            index.update("foo", new DocumentUpdateRequest(null, 0, 2, null, 
fields));
 
             // Should be prevented from going down to 1.
             assertThrows(
                     UpdatesOutOfOrderException.class,
-                    () -> index.update("foo", new DocumentUpdateRequest(2, 1, 
null, fields)));
+                    () -> index.update("foo", new DocumentUpdateRequest(null, 
2, 1, null, fields)));
         } finally {
             cleanup(index);
         }
@@ -235,7 +235,7 @@ public class LuceneIndexTest {
             assertThat(info.getUpdateSeq()).isEqualTo(0);
 
             final Collection<Field> fields = List.of(new DoubleField("bar", 
12.0, false));
-            index.update("foo", new DocumentUpdateRequest(0, 2, null, fields));
+            index.update("foo", new DocumentUpdateRequest(null, 0, 2, null, 
fields));
             index.commit();
 
             info = index.info();
@@ -252,13 +252,13 @@ public class LuceneIndexTest {
         Index index = setup(path);
         try {
             final Collection<Field> fields = List.of(new DoubleField("bar", 
12.0, false));
-            index.update("foo", new DocumentUpdateRequest(0, 2, null, fields));
+            index.update("foo", new DocumentUpdateRequest(null, 0, 2, null, 
fields));
             index.commit();
 
             IndexInfo info = index.info();
             assertThat(info.getNumDocs()).isEqualTo(1);
 
-            index.delete("foo", new DocumentDeleteRequest(2, 3, false));
+            index.delete("foo", new DocumentDeleteRequest(null, 2, 3, false));
             index.commit();
 
             info = index.info();
@@ -274,13 +274,13 @@ public class LuceneIndexTest {
         Index index = setup(path);
         try {
             final Collection<Field> fields = List.of(new DoubleField("bar", 
12.0, false));
-            index.update("foo", new DocumentUpdateRequest(0, 2, null, fields));
+            index.update("foo", new DocumentUpdateRequest(null, 0, 2, null, 
fields));
             index.commit();
 
             IndexInfo info = index.info();
             assertThat(info.getNumDocs()).isEqualTo(1);
 
-            index.delete("foo", new DocumentDeleteRequest(0, 3, true));
+            index.delete("foo", new DocumentDeleteRequest(null, 0, 3, true));
             index.commit();
 
             info = index.info();
diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl
index 2d140e580..821484d30 100644
--- a/src/nouveau/src/nouveau_api.erl
+++ b/src/nouveau/src/nouveau_api.erl
@@ -26,6 +26,8 @@
     delete_doc/4,
     purge_doc/4,
     update_doc/6,
+    start_update/1,
+    end_update/1,
     search/2,
     set_purge_seq/3,
     set_update_seq/3,
@@ -34,6 +36,7 @@
 ]).
 
 -define(JSON_CONTENT_TYPE, {"Content-Type", "application/json"}).
+-define(JSON_SEQ_CONTENT_TYPE, {"Content-Type", "application/json-seq"}).
 
 analyze(Text, Analyzer) when
     is_binary(Text), is_binary(Analyzer)
@@ -99,41 +102,54 @@ delete_path(Path, Exclusions) when
             send_error(Reason)
     end.
 
-delete_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq) when
+delete_doc({_, _} = PoolStreamRef, DocId, MatchSeq, UpdateSeq) when
     is_binary(DocId),
     is_integer(MatchSeq),
     MatchSeq >= 0,
     is_integer(UpdateSeq),
     UpdateSeq > 0
 ->
-    ReqBody = #{match_seq => MatchSeq, seq => UpdateSeq, purge => false},
-    Resp = send_if_enabled(
-        doc_path(Index, DocId),
-        [?JSON_CONTENT_TYPE],
-        <<"DELETE">>,
-        jiffy:encode(ReqBody)
-    ),
-    case Resp of
-        {ok, 200, _, _} ->
-            ok;
-        {ok, StatusCode, _, RespBody} ->
-            {error, jaxrs_error(StatusCode, RespBody)};
-        {error, Reason} ->
-            send_error(Reason)
-    end.
+    Row = #{
+        <<"@type">> => delete,
+        doc_id => DocId,
+        match_seq => MatchSeq,
+        seq => UpdateSeq,
+        purge => false
+    },
+    ok = gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)),
+    check_status(PoolStreamRef).
 
-purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when
+purge_doc({_, _} = PoolStreamRef, DocId, MatchSeq, PurgeSeq) when
     is_binary(DocId),
     is_integer(MatchSeq),
     MatchSeq >= 0,
     is_integer(PurgeSeq),
     PurgeSeq > 0
 ->
-    ReqBody = #{match_seq => MatchSeq, seq => PurgeSeq, purge => true},
-    Resp = send_if_enabled(
-        doc_path(Index, DocId), [?JSON_CONTENT_TYPE], <<"DELETE">>, 
jiffy:encode(ReqBody)
-    ),
-    case Resp of
+    Row = #{
+        <<"@type">> => delete,
+        doc_id => DocId,
+        match_seq => MatchSeq,
+        seq => PurgeSeq,
+        purge => true
+    },
+    ok = gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)),
+    check_status(PoolStreamRef).
+
+start_update(#index{} = Index) ->
+    case nouveau:enabled() of
+        true ->
+            gun_pool:post(
+                update_path(Index),
+                [nouveau_gun:host_header(), ?JSON_SEQ_CONTENT_TYPE]
+            );
+        false ->
+            {error, nouveau_not_enabled}
+    end.
+
+end_update({_, _} = PoolStreamRef) ->
+    ok = gun_pool:data(PoolStreamRef, fin, <<>>),
+    case await(PoolStreamRef) of
         {ok, 200, _, _} ->
             ok;
         {ok, StatusCode, _, RespBody} ->
@@ -142,7 +158,7 @@ purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when
             send_error(Reason)
     end.
 
-update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) 
when
+update_doc({_, _} = PoolStreamRef, DocId, MatchSeq, UpdateSeq, Partition, 
Fields) when
     is_binary(DocId),
     is_integer(MatchSeq),
     MatchSeq >= 0,
@@ -151,26 +167,16 @@ update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, 
Partition, Fields) when
     (is_binary(Partition) orelse Partition == null),
     is_list(Fields)
 ->
-    ReqBody = #{
+    Row = #{
+        <<"@type">> => update,
+        doc_id => DocId,
         match_seq => MatchSeq,
         seq => UpdateSeq,
         partition => Partition,
         fields => Fields
     },
-    Resp = send_if_enabled(
-        doc_path(Index, DocId),
-        [?JSON_CONTENT_TYPE],
-        <<"PUT">>,
-        jiffy:encode(ReqBody)
-    ),
-    case Resp of
-        {ok, 200, _, _} ->
-            ok;
-        {ok, StatusCode, _, RespBody} ->
-            {error, jaxrs_error(StatusCode, RespBody)};
-        {error, Reason} ->
-            send_error(Reason)
-    end.
+    ok = gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)),
+    check_status(PoolStreamRef).
 
 search(#index{} = Index, QueryArgs) ->
     Resp = send_if_enabled(
@@ -188,32 +194,23 @@ search(#index{} = Index, QueryArgs) ->
             send_error(Reason)
     end.
 
-set_update_seq(#index{} = Index, MatchSeq, UpdateSeq) ->
-    ReqBody = #{
+set_update_seq({_, _} = PoolStreamRef, MatchSeq, UpdateSeq) ->
+    Row = #{
+       <<"@type">> => index_info,
         match_update_seq => MatchSeq,
         update_seq => UpdateSeq
     },
-    set_seq(Index, ReqBody).
+    ok = gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)),
+    check_status(PoolStreamRef).
 
-set_purge_seq(#index{} = Index, MatchSeq, PurgeSeq) ->
-    ReqBody = #{
+set_purge_seq({_, _} = PoolStreamRef, MatchSeq, PurgeSeq) ->
+    Row = #{
+       <<"@type">> => index_info,
         match_purge_seq => MatchSeq,
         purge_seq => PurgeSeq
     },
-    set_seq(Index, ReqBody).
-
-set_seq(#index{} = Index, ReqBody) ->
-    Resp = send_if_enabled(
-        index_path(Index), [?JSON_CONTENT_TYPE], <<"POST">>, 
jiffy:encode(ReqBody)
-    ),
-    case Resp of
-        {ok, 200, _, _} ->
-            ok;
-        {ok, StatusCode, _, RespBody} ->
-            {error, jaxrs_error(StatusCode, RespBody)};
-        {error, Reason} ->
-            send_error(Reason)
-    end.
+    ok = gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)),
+    check_status(PoolStreamRef).
 
 supported_lucene_versions() ->
     Resp = send_if_enabled(<<"/">>, [], <<"GET">>),
@@ -234,17 +231,12 @@ index_path(Path) when is_binary(Path) ->
 index_path(#index{} = Index) ->
     [<<"/index/">>, couch_util:url_encode(nouveau_util:index_name(Index))].
 
-doc_path(#index{} = Index, DocId) ->
-    [
-        <<"/index/">>,
-        couch_util:url_encode(nouveau_util:index_name(Index)),
-        <<"/doc/">>,
-        couch_util:url_encode(DocId)
-    ].
-
 search_path(#index{} = Index) ->
     [index_path(Index), <<"/search">>].
 
+update_path(#index{} = Index) ->
+    [index_path(Index), <<"/update">>].
+
 jaxrs_error(400, Body) ->
     {bad_request, message(Body)};
 jaxrs_error(404, Body) ->
@@ -291,20 +283,7 @@ send_if_enabled(Path, ReqHeaders, Method, ReqBody, 
RemainingTries) ->
                 )
             of
                 {async, PoolStreamRef} ->
-                    Timeout = config:get_integer("nouveau", "request_timeout", 
30000),
-                    case gun_pool:await(PoolStreamRef, Timeout) of
-                        {response, fin, Status, RespHeaders} ->
-                            {ok, Status, RespHeaders, []};
-                        {response, nofin, Status, RespHeaders} ->
-                            case gun_pool:await_body(PoolStreamRef, Timeout) of
-                                {ok, RespBody} ->
-                                    {ok, Status, RespHeaders, RespBody};
-                                {error, Reason} ->
-                                    {error, Reason}
-                            end;
-                        {error, Reason} ->
-                            {error, Reason}
-                    end;
+                    await(PoolStreamRef);
                 {error, no_connection_available, _Reason} when RemainingTries 
> 0 ->
                     timer:sleep(1000),
                     send_if_enabled(Path, ReqHeaders, Method, ReqBody, 
RemainingTries - 1);
@@ -314,3 +293,50 @@ send_if_enabled(Path, ReqHeaders, Method, ReqBody, 
RemainingTries) ->
         false ->
             {error, nouveau_not_enabled}
     end.
+
+await(PoolStreamRef) ->
+    Timeout = config:get_integer("nouveau", "request_timeout", 30000),
+    await(PoolStreamRef, Timeout).
+
+await({ConnPid, _} = PoolStreamRef, Timeout) ->
+    MRef = monitor(process, ConnPid),
+    T0 = now_ms(),
+    Res =
+        case gun_pool:await(PoolStreamRef, Timeout, MRef) of
+            {response, fin, Status, RespHeaders} ->
+                {ok, Status, RespHeaders, []};
+            {response, nofin, Status, RespHeaders} ->
+                Elapsed = now_ms() - T0,
+                case gun_pool:await_body(PoolStreamRef, max(0, Timeout - 
Elapsed), MRef) of
+                    {ok, RespBody} ->
+                        {ok, Status, RespHeaders, RespBody};
+                    {'DOWN', MRef, process, ConnPid, Reason} ->
+                        {error, Reason};
+                    {error, Reason} ->
+                        {error, Reason}
+                end;
+            {'DOWN', MRef, process, ConnPid, Reason} ->
+                {error, Reason};
+            {error, Reason} ->
+                {error, Reason}
+        end,
+    demonitor(MRef, [flush]),
+    Res.
+
+now_ms() ->
+    erlang:monotonic_time(millisecond).
+
+encode_json_seq(Data) ->
+    [$\x{1e}, jiffy:encode(Data), $\n].
+
+check_status({_, _} = PoolStreamRef) ->
+    case await(PoolStreamRef, 0) of
+        {error, timeout} ->
+            ok;
+        {ok, 200, _, _} ->
+            ok;
+        {ok, StatusCode, _, RespBody} ->
+            {error, jaxrs_error(StatusCode, RespBody)};
+        {error, Reason} ->
+            send_error(Reason)
+    end.
diff --git a/src/nouveau/src/nouveau_index_updater.erl 
b/src/nouveau/src/nouveau_index_updater.erl
index 4bfea753a..8d9aa3267 100644
--- a/src/nouveau/src/nouveau_index_updater.erl
+++ b/src/nouveau/src/nouveau_index_updater.erl
@@ -27,6 +27,7 @@
 -import(nouveau_util, [index_path/1]).
 
 -record(acc, {
+    pool_stream_ref,
     db,
     index,
     proc,
@@ -80,14 +81,15 @@ update(#index{} = Index) ->
                     index_update_seq = IndexUpdateSeq,
                     index_purge_seq = IndexPurgeSeq
                 },
-                {ok, PurgeAcc1} = purge_index(Db, Index, PurgeAcc0),
-
+                {async, PoolStreamRef} = nouveau_api:start_update(Index),
+                {ok, PurgeAcc1} = purge_index(PoolStreamRef, Db, Index, 
PurgeAcc0),
                 NewCurSeq = couch_db:get_update_seq(Db),
                 Proc = get_os_process(Index#index.def_lang),
                 try
                     true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, 
<<"nouveau">>]),
 
                     Acc0 = #acc{
+                        pool_stream_ref = PoolStreamRef,
                         db = Db,
                         index = Index,
                         proc = Proc,
@@ -99,8 +101,10 @@ update(#index{} = Index) ->
                     {ok, Acc1} = couch_db:fold_changes(
                         Db, Acc0#acc.update_seq, fun load_docs/2, Acc0, []
                     ),
-                    exit(nouveau_api:set_update_seq(Index, 
Acc1#acc.update_seq, NewCurSeq))
+                   nouveau_api:set_update_seq(PoolStreamRef, 
Acc1#acc.update_seq, NewCurSeq),
+                    exit(nouveau_api:end_update(PoolStreamRef))
                 after
+                   gun_pool:cancel(PoolStreamRef),
                     ret_os_process(Proc)
                 end
         end
@@ -125,8 +129,8 @@ load_docs(FDI, #acc{} = Acc1) ->
             false ->
                 case
                     update_or_delete_index(
+                        Acc1#acc.pool_stream_ref,
                         Acc1#acc.db,
-                        Acc1#acc.index,
                         Acc1#acc.update_seq,
                         DI,
                         Acc1#acc.proc
@@ -142,11 +146,11 @@ load_docs(FDI, #acc{} = Acc1) ->
         end,
     {ok, Acc2#acc{changes_done = Acc2#acc.changes_done + 1}}.
 
-update_or_delete_index(Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) 
->
+update_or_delete_index(PoolStreamRef, Db, MatchSeq, #doc_info{} = DI, Proc) ->
     #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} 
= DI,
     case Del of
         true ->
-            nouveau_api:delete_doc(Index, Id, MatchSeq, Seq);
+            nouveau_api:delete_doc(PoolStreamRef, Id, MatchSeq, Seq);
         false ->
             {ok, Doc} = couch_db:open_doc(Db, DI, []),
             Json = couch_doc:to_json_obj(Doc, []),
@@ -160,10 +164,10 @@ update_or_delete_index(Db, #index{} = Index, MatchSeq, 
#doc_info{} = DI, Proc) -
                 end,
             case Fields of
                 [] ->
-                    nouveau_api:delete_doc(Index, Id, MatchSeq, Seq);
+                    nouveau_api:delete_doc(PoolStreamRef, Id, MatchSeq, Seq);
                 _ ->
                     nouveau_api:update_doc(
-                        Index, Id, MatchSeq, Seq, Partition, Fields
+                        PoolStreamRef, Id, MatchSeq, Seq, Partition, Fields
                     )
             end
     end.
@@ -209,7 +213,7 @@ index_definition(#index{} = Index) ->
         <<"field_analyzers">> => Index#index.field_analyzers
     }.
 
-purge_index(Db, Index, #purge_acc{} = PurgeAcc0) ->
+purge_index(PoolStreamRef, Db, Index, #purge_acc{} = PurgeAcc0) ->
     Proc = get_os_process(Index#index.def_lang),
     try
         true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, 
<<"nouveau">>]),
@@ -218,7 +222,7 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) ->
                 case couch_db:get_full_doc_info(Db, Id) of
                     not_found ->
                         ok = nouveau_api:purge_doc(
-                            Index, Id, PurgeAcc1#purge_acc.index_purge_seq, 
PurgeSeq
+                            PoolStreamRef, Id, 
PurgeAcc1#purge_acc.index_purge_seq, PurgeSeq
                         ),
                         PurgeAcc1#purge_acc{index_purge_seq = PurgeSeq};
                     FDI ->
@@ -229,8 +233,8 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) ->
                                 PurgeAcc1;
                             false ->
                                 update_or_delete_index(
+                                    PoolStreamRef,
                                     Db,
-                                    Index,
                                     PurgeAcc1#purge_acc.index_update_seq,
                                     DI,
                                     Proc
@@ -250,7 +254,7 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) ->
         ),
         DbPurgeSeq = couch_db:get_purge_seq(Db),
         ok = nouveau_api:set_purge_seq(
-            Index, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq
+            PoolStreamRef, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq
         ),
         update_local_doc(Db, Index, DbPurgeSeq),
         {ok, PurgeAcc3}


Reply via email to