This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch nouveau-ibrowse-improvements
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 6a5f5db728e9d5dee0886970ab254a602ec96259
Author: Robert Newson <[email protected]>
AuthorDate: Sat Sep 2 09:24:22 2023 +0100

    pipeline requests
    
    Send multiple http requests before retrieving the responses for
    a significant performance gain
    
    This required enhancement to the updates out of order check to ensure
    that a failure of one update request causes all subsequent requests
    to fail.
---
 nouveau/nouveau.yaml                               |   4 +-
 .../couchdb/nouveau/api/DocumentDeleteRequest.java |  46 +++++----
 .../couchdb/nouveau/api/DocumentUpdateRequest.java |  39 +++++---
 .../couchdb/nouveau/api/IndexInfoRequest.java      |  40 +++++---
 .../org/apache/couchdb/nouveau/core/Index.java     |  56 +++++------
 ...tion.java => IndexUpdateConflictException.java} |  10 +-
 .../couchdb/nouveau/health/IndexHealthCheck.java   |   2 +-
 .../couchdb/nouveau/resources/IndexResource.java   |  13 ++-
 .../couchdb/nouveau/lucene9/Lucene9IndexTest.java  |  26 ++---
 src/nouveau/src/nouveau_api.erl                    | 101 +++++++++++--------
 src/nouveau/src/nouveau_index_updater.erl          | 107 ++++++++++++++++++---
 11 files changed, 287 insertions(+), 157 deletions(-)

diff --git a/nouveau/nouveau.yaml b/nouveau/nouveau.yaml
index 095b06bcf..8391277fc 100644
--- a/nouveau/nouveau.yaml
+++ b/nouveau/nouveau.yaml
@@ -22,6 +22,4 @@ server:
       - GET
       - POST
   requestLog:
-    appenders:
-      - type: console
-        target: stderr
+    appenders: []
diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
index 99f6f5d9f..828cc7a66 100644
--- 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
+++ 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
@@ -13,43 +13,51 @@
 
 package org.apache.couchdb.nouveau.api;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.PropertyNamingStrategies;
-import com.fasterxml.jackson.databind.annotation.JsonNaming;
 import jakarta.validation.constraints.Positive;
+import jakarta.validation.constraints.PositiveOrZero;
 
-@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
-public class DocumentDeleteRequest {
+public final class DocumentDeleteRequest {
 
-    @Positive
-    private long seq;
+    @PositiveOrZero
+    private final long fromSeq;
 
-    private boolean purge;
+    @Positive
+    private final long toSeq;
 
-    public DocumentDeleteRequest() {
-        // Jackson deserialization
-    }
+    private final boolean purge;
 
-    public DocumentDeleteRequest(long seq, final boolean purge) {
-        if (seq < 1) {
-            throw new IllegalArgumentException("seq must be 1 or greater");
+    @JsonCreator
+    public DocumentDeleteRequest(
+            @JsonProperty("from_seq") final long fromSeq,
+            @JsonProperty("to_seq") final long toSeq,
+            @JsonProperty("purge") final boolean purge) {
+        if (fromSeq < 0) {
+            throw new IllegalArgumentException("fromSeq must be 0 or greater");
         }
-        this.seq = seq;
+        if (toSeq < 1) {
+            throw new IllegalArgumentException("toSeq must be 1 or greater");
+        }
+        this.fromSeq = fromSeq;
+        this.toSeq = toSeq;
         this.purge = purge;
     }
 
-    @JsonProperty
-    public long getSeq() {
-        return seq;
+    public long getFromSeq() {
+        return fromSeq;
+    }
+
+    public long getToSeq() {
+        return toSeq;
     }
 
-    @JsonProperty
     public boolean isPurge() {
         return purge;
     }
 
     @Override
     public String toString() {
-        return "DocumentDeleteRequest [seq=" + seq + ", purge=" + purge + "]";
+        return "DocumentDeleteRequest [fromSeq=" + fromSeq + ", toSeq=" + 
toSeq + ", purge=" + purge + "]";
     }
 }
diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java
 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java
index 2fbd01e3b..725dc1803 100644
--- 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java
+++ 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java
@@ -13,42 +13,51 @@
 
 package org.apache.couchdb.nouveau.api;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.PropertyNamingStrategies;
 import com.fasterxml.jackson.databind.annotation.JsonNaming;
 import jakarta.validation.Valid;
 import jakarta.validation.constraints.NotEmpty;
 import jakarta.validation.constraints.Positive;
+import jakarta.validation.constraints.PositiveOrZero;
 import java.util.Collection;
 
 @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
 public class DocumentUpdateRequest {
 
+    @PositiveOrZero
+    private final long fromSeq;
+
     @Positive
-    private long seq;
+    private final long toSeq;
 
-    private String partition;
+    private final String partition;
 
     @NotEmpty
     @Valid
-    private Collection<Field> fields;
-
-    public DocumentUpdateRequest() {
-        // Jackson deserialization
-    }
+    private final Collection<Field> fields;
 
-    public DocumentUpdateRequest(long seq, String partition, Collection<Field> 
fields) {
-        this.seq = seq;
+    @JsonCreator
+    public DocumentUpdateRequest(
+            @JsonProperty("from_seq") final long fromSeq,
+            @JsonProperty("to_seq") final long toSeq,
+            @JsonProperty("partition") final String partition,
+            @JsonProperty("fields") final Collection<Field> fields) {
+        this.fromSeq = fromSeq;
+        this.toSeq = toSeq;
         this.partition = partition;
         this.fields = fields;
     }
 
-    @JsonProperty
-    public long getSeq() {
-        return seq;
+    public long getFromSeq() {
+        return fromSeq;
+    }
+
+    public long getToSeq() {
+        return toSeq;
     }
 
-    @JsonProperty
     public String getPartition() {
         return partition;
     }
@@ -57,13 +66,13 @@ public class DocumentUpdateRequest {
         return partition != null;
     }
 
-    @JsonProperty
     public Collection<Field> getFields() {
         return fields;
     }
 
     @Override
     public String toString() {
-        return "DocumentUpdateRequest [seq=" + seq + ", partition=" + 
partition + ", fields=" + fields + "]";
+        return "DocumentUpdateRequest [fromSeq=" + fromSeq + "toSeq=" + toSeq 
+ ", partition=" + partition + ", fields="
+                + fields + "]";
     }
 }
diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java
index 67e806018..55bc051da 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java
@@ -15,31 +15,49 @@ package org.apache.couchdb.nouveau.api;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import jakarta.validation.constraints.Positive;
+import jakarta.validation.constraints.PositiveOrZero;
 import java.util.OptionalLong;
 
 public final class IndexInfoRequest {
 
-    private OptionalLong updateSeq;
+    private OptionalLong fromUpdateSeq;
 
-    private OptionalLong purgeSeq;
+    private OptionalLong toUpdateSeq;
+
+    private OptionalLong fromPurgeSeq;
+
+    private OptionalLong toPurgeSeq;
 
     public IndexInfoRequest(
-            @JsonProperty("update_seq") @Positive OptionalLong updateSeq,
-            @JsonProperty("purge_seq") @Positive OptionalLong purgeSeq) {
-        this.updateSeq = updateSeq;
-        this.purgeSeq = purgeSeq;
+            @JsonProperty("from_update_seq") @PositiveOrZero OptionalLong 
fromUpdateSeq,
+            @JsonProperty("to_update_seq") @Positive OptionalLong toUpdateSeq,
+            @JsonProperty("from_purge_seq") @PositiveOrZero OptionalLong 
fromPurgeSeq,
+            @JsonProperty("to_purge_seq") @Positive OptionalLong toPurgeSeq) {
+        this.fromUpdateSeq = fromUpdateSeq;
+        this.toUpdateSeq = toUpdateSeq;
+        this.fromPurgeSeq = fromPurgeSeq;
+        this.toPurgeSeq = toPurgeSeq;
+    }
+
+    public OptionalLong getFromUpdateSeq() {
+        return fromUpdateSeq;
+    }
+
+    public OptionalLong getToUpdateSeq() {
+        return toUpdateSeq;
     }
 
-    public OptionalLong getUpdateSeq() {
-        return updateSeq;
+    public OptionalLong getFromPurgeSeq() {
+        return fromPurgeSeq;
     }
 
-    public OptionalLong getPurgeSeq() {
-        return purgeSeq;
+    public OptionalLong getToPurgeSeq() {
+        return toPurgeSeq;
     }
 
     @Override
     public String toString() {
-        return "IndexInfoRequest [updateSeq=" + updateSeq + ", purgeSeq=" + 
purgeSeq + "]";
+        return "IndexInfoRequest [fromUpdateSeq=" + fromUpdateSeq + ", 
toUpdateSeq=" + toUpdateSeq + ", fromPurgeSeq="
+                + fromPurgeSeq + ", toPurgeSeq=" + toPurgeSeq + "]";
     }
 }
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
index ca0cd69ad..f5ccd89b2 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
@@ -13,8 +13,6 @@
 
 package org.apache.couchdb.nouveau.core;
 
-import jakarta.ws.rs.WebApplicationException;
-import jakarta.ws.rs.core.Response.Status;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.Semaphore;
@@ -86,22 +84,22 @@ public abstract class Index implements Closeable {
     protected abstract long doDiskSize() throws IOException;
 
     public final synchronized void update(final String docId, final 
DocumentUpdateRequest request) throws IOException {
-        assertUpdateSeqIsLower(request.getSeq());
+        assertUpdateSeqMatches(request.getFromSeq());
         doUpdate(docId, request);
-        incrementUpdateSeq(request.getSeq());
+        incrementUpdateSeq(request.getFromSeq(), request.getToSeq());
     }
 
     protected abstract void doUpdate(final String docId, final 
DocumentUpdateRequest request) throws IOException;
 
     public final synchronized void delete(final String docId, final 
DocumentDeleteRequest request) throws IOException {
         if (request.isPurge()) {
-            assertPurgeSeqIsLower(request.getSeq());
+            assertPurgeSeqMatches(request.getFromSeq());
             doDelete(docId, request);
-            incrementPurgeSeq(request.getSeq());
+            incrementPurgeSeq(request.getFromSeq(), request.getToSeq());
         } else {
-            assertUpdateSeqIsLower(request.getSeq());
+            assertUpdateSeqMatches(request.getFromSeq());
             doDelete(docId, request);
-            incrementUpdateSeq(request.getSeq());
+            incrementUpdateSeq(request.getFromSeq(), request.getToSeq());
         }
     }
 
@@ -132,23 +130,21 @@ public abstract class Index implements Closeable {
 
     protected abstract boolean doCommit(final long updateSeq, final long 
purgeSeq) throws IOException;
 
-    public final synchronized void setUpdateSeq(final long updateSeq) throws 
IOException {
-        if (updateSeq < this.updateSeq) {
-            throw new WebApplicationException(
-                    "update_seq must be equal or greater than current 
update_seq", Status.BAD_REQUEST);
+    public final synchronized void setUpdateSeq(final long fromSeq, final long 
toSeq) throws IOException {
+        if (toSeq < this.updateSeq) {
+            throw new IndexUpdateConflictException(fromSeq, toSeq);
         }
-        if (updateSeq > this.updateSeq) {
-            incrementUpdateSeq(updateSeq);
+        if (toSeq > this.updateSeq) {
+            incrementUpdateSeq(fromSeq, toSeq);
         }
     }
 
-    public final synchronized void setPurgeSeq(final long purgeSeq) throws 
IOException {
-        if (purgeSeq < this.purgeSeq) {
-            throw new WebApplicationException(
-                    "purge_seq must be equal or greater than current 
purge_seq", Status.BAD_REQUEST);
+    public final synchronized void setPurgeSeq(final long fromSeq, final long 
toSeq) throws IOException {
+        if (toSeq < this.purgeSeq) {
+            throw new IndexUpdateConflictException(fromSeq, toSeq);
         }
-        if (purgeSeq > this.purgeSeq) {
-            incrementPurgeSeq(purgeSeq);
+        if (toSeq > this.purgeSeq) {
+            incrementPurgeSeq(fromSeq, toSeq);
         }
     }
 
@@ -178,29 +174,29 @@ public abstract class Index implements Closeable {
         }
     }
 
-    protected final void assertUpdateSeqIsLower(final long updateSeq) throws 
UpdatesOutOfOrderException {
+    protected final void assertUpdateSeqMatches(final long updateSeq) throws 
IndexUpdateConflictException {
         assert Thread.holdsLock(this);
-        if (!(updateSeq > this.updateSeq)) {
-            throw new UpdatesOutOfOrderException(this.updateSeq, updateSeq);
+        if (updateSeq != this.updateSeq) {
+            throw new IndexUpdateConflictException(this.updateSeq, updateSeq);
         }
     }
 
-    protected final void incrementUpdateSeq(final long updateSeq) throws 
IOException {
+    protected final void incrementUpdateSeq(final long ifMatchSeq, final long 
updateSeq) throws IOException {
         assert Thread.holdsLock(this);
-        assertUpdateSeqIsLower(updateSeq);
+        assertUpdateSeqMatches(ifMatchSeq);
         this.updateSeq = updateSeq;
     }
 
-    protected final void assertPurgeSeqIsLower(final long purgeSeq) throws 
UpdatesOutOfOrderException {
+    protected final void assertPurgeSeqMatches(final long purgeSeq) throws 
IndexUpdateConflictException {
         assert Thread.holdsLock(this);
-        if (!(purgeSeq > this.purgeSeq)) {
-            throw new UpdatesOutOfOrderException(this.purgeSeq, purgeSeq);
+        if (purgeSeq != this.purgeSeq) {
+            throw new IndexUpdateConflictException(this.purgeSeq, purgeSeq);
         }
     }
 
-    protected final void incrementPurgeSeq(final long purgeSeq) throws 
IOException {
+    protected final void incrementPurgeSeq(final long ifMatchSeq, final long 
purgeSeq) throws IOException {
         assert Thread.holdsLock(this);
-        assertPurgeSeqIsLower(purgeSeq);
+        assertPurgeSeqMatches(ifMatchSeq);
         this.purgeSeq = purgeSeq;
     }
 
diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java
 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexUpdateConflictException.java
similarity index 63%
rename from 
nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java
rename to 
nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexUpdateConflictException.java
index acda8beb7..7003551a6 100644
--- 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java
+++ 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexUpdateConflictException.java
@@ -16,13 +16,9 @@ package org.apache.couchdb.nouveau.core;
 import jakarta.ws.rs.WebApplicationException;
 import jakarta.ws.rs.core.Response.Status;
 
-public final class UpdatesOutOfOrderException extends WebApplicationException {
+public final class IndexUpdateConflictException extends 
WebApplicationException {
 
-    public UpdatesOutOfOrderException(final long currentSeq, final long 
attemptedSeq) {
-        super(
-                String.format(
-                        "Updates applied in the wrong order (current seq: %d, 
attempted seq: %d)",
-                        currentSeq, attemptedSeq),
-                Status.BAD_REQUEST);
+    public IndexUpdateConflictException(final long fromSeq, final long toSeq) {
+        super(String.format("Index update conflict (from seq: %d, to seq: 
%d)", fromSeq, toSeq), Status.CONFLICT);
     }
 }
diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java
index 889ab2726..b66bbcde5 100644
--- 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java
+++ 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java
@@ -42,7 +42,7 @@ public final class IndexHealthCheck extends HealthCheck {
         indexResource.createIndex(name, new IndexDefinition("standard", null));
         try {
             final DocumentUpdateRequest documentUpdateRequest =
-                    new DocumentUpdateRequest(1, null, 
Collections.emptyList());
+                    new DocumentUpdateRequest(0, 1, null, 
Collections.emptyList());
             indexResource.updateDoc(name, "foo", documentUpdateRequest);
 
             final SearchRequest searchRequest = new SearchRequest();
diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
index ce99830bb..5c6614c21 100644
--- 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
+++ 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
@@ -102,11 +102,16 @@ public final class IndexResource {
     public void setIndexInfo(@PathParam("name") String name, @NotNull @Valid 
IndexInfoRequest request)
             throws Exception {
         indexManager.with(name, indexLoader(), (index) -> {
-            if (request.getUpdateSeq().isPresent()) {
-                index.setUpdateSeq(request.getUpdateSeq().getAsLong());
+            if (request.getFromUpdateSeq().isPresent()
+                    && request.getToUpdateSeq().isPresent()) {
+                index.setUpdateSeq(
+                        request.getFromUpdateSeq().getAsLong(),
+                        request.getToUpdateSeq().getAsLong());
             }
-            if (request.getPurgeSeq().isPresent()) {
-                index.setPurgeSeq(request.getPurgeSeq().getAsLong());
+            if (request.getFromPurgeSeq().isPresent() && 
request.getToPurgeSeq().isPresent()) {
+                index.setPurgeSeq(
+                        request.getFromPurgeSeq().getAsLong(),
+                        request.getToPurgeSeq().getAsLong());
             }
             return null;
         });
diff --git 
a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java
 
b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java
index 5d989804b..f174aad26 100644
--- 
a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java
+++ 
b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java
@@ -34,7 +34,7 @@ import org.apache.couchdb.nouveau.api.SearchResults;
 import org.apache.couchdb.nouveau.api.StringField;
 import org.apache.couchdb.nouveau.core.Index;
 import org.apache.couchdb.nouveau.core.IndexLoader;
-import org.apache.couchdb.nouveau.core.UpdatesOutOfOrderException;
+import org.apache.couchdb.nouveau.core.IndexUpdateConflictException;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
@@ -72,7 +72,7 @@ public class Lucene9IndexTest {
             final int count = 100;
             for (int i = 1; i <= count; i++) {
                 final Collection<Field> fields = List.of(new 
StringField("foo", "bar", false));
-                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i, null, fields);
+                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i - 1, i, null, fields);
                 index.update("doc" + i, request);
             }
             final SearchRequest request = new SearchRequest();
@@ -91,7 +91,7 @@ public class Lucene9IndexTest {
             final int count = 100;
             for (int i = 1; i <= count; i++) {
                 final Collection<Field> fields = List.of(new 
StringField("foo", "bar", false));
-                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i, null, fields);
+                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i - 1, i, null, fields);
                 index.update("doc" + i, request);
             }
             final SearchRequest request = new SearchRequest();
@@ -111,7 +111,7 @@ public class Lucene9IndexTest {
             final int count = 100;
             for (int i = 1; i <= count; i++) {
                 final Collection<Field> fields = List.of(new 
StringField("bar", "baz", false));
-                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i, null, fields);
+                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i - 1, i, null, fields);
                 index.update("doc" + i, request);
             }
             final SearchRequest request = new SearchRequest();
@@ -131,7 +131,7 @@ public class Lucene9IndexTest {
             final int count = 100;
             for (int i = 1; i <= count; i++) {
                 final Collection<Field> fields = List.of(new 
DoubleField("bar", (double) i, false));
-                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i, null, fields);
+                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i - 1, i, null, fields);
                 index.update("doc" + i, request);
             }
             final SearchRequest request = new SearchRequest();
@@ -155,12 +155,12 @@ public class Lucene9IndexTest {
             final Collection<Field> fields = Collections.emptyList();
 
             // Go to 2.
-            index.update("foo", new DocumentUpdateRequest(2, null, fields));
+            index.update("foo", new DocumentUpdateRequest(0, 2, null, fields));
 
             // Should be prevented from going down to 1.
             assertThrows(
-                    UpdatesOutOfOrderException.class,
-                    () -> index.update("foo", new DocumentUpdateRequest(1, 
null, fields)));
+                    IndexUpdateConflictException.class,
+                    () -> index.update("foo", new DocumentUpdateRequest(2, 1, 
null, fields)));
         } finally {
             cleanup(index);
         }
@@ -176,7 +176,7 @@ public class Lucene9IndexTest {
             assertThat(info.getUpdateSeq()).isEqualTo(0);
 
             final Collection<Field> fields = List.of(new DoubleField("bar", 
12.0, false));
-            index.update("foo", new DocumentUpdateRequest(2, null, fields));
+            index.update("foo", new DocumentUpdateRequest(0, 2, null, fields));
             index.commit();
 
             info = index.info();
@@ -193,13 +193,13 @@ public class Lucene9IndexTest {
         Index index = setup(path);
         try {
             final Collection<Field> fields = List.of(new DoubleField("bar", 
12.0, false));
-            index.update("foo", new DocumentUpdateRequest(2, null, fields));
+            index.update("foo", new DocumentUpdateRequest(0, 2, null, fields));
             index.commit();
 
             IndexInfo info = index.info();
             assertThat(info.getNumDocs()).isEqualTo(1);
 
-            index.delete("foo", new DocumentDeleteRequest(3, false));
+            index.delete("foo", new DocumentDeleteRequest(2, 3, false));
             index.commit();
 
             info = index.info();
@@ -215,13 +215,13 @@ public class Lucene9IndexTest {
         Index index = setup(path);
         try {
             final Collection<Field> fields = List.of(new DoubleField("bar", 
12.0, false));
-            index.update("foo", new DocumentUpdateRequest(2, null, fields));
+            index.update("foo", new DocumentUpdateRequest(0, 2, null, fields));
             index.commit();
 
             IndexInfo info = index.info();
             assertThat(info.getNumDocs()).isEqualTo(1);
 
-            index.delete("foo", new DocumentDeleteRequest(3, true));
+            index.delete("foo", new DocumentDeleteRequest(2, 3, true));
             index.commit();
 
             info = index.info();
diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl
index 6f98cf032..c039a44fd 100644
--- a/src/nouveau/src/nouveau_api.erl
+++ b/src/nouveau/src/nouveau_api.erl
@@ -23,12 +23,12 @@
     create_index/2,
     delete_path/1,
     delete_path/2,
-    delete_doc/3,
-    purge_doc/3,
-    update_doc/5,
+    delete_doc/4,
+    purge_doc/4,
+    update_doc/6,
     search/2,
-    set_purge_seq/2,
-    set_update_seq/2
+    set_purge_seq/3,
+    set_update_seq/3
 ]).
 
 -define(JSON_CONTENT_TYPE, {"Content-Type", "application/json"}).
@@ -98,51 +98,61 @@ delete_path(Path, Exclusions) when
             send_error(Reason)
     end.
 
-delete_doc(#index{} = Index, DocId, UpdateSeq) when
-    is_binary(DocId), is_integer(UpdateSeq)
+delete_doc(#index{} = Index, DocId, FromSeq, ToSeq) when
+    is_binary(DocId), is_integer(FromSeq), is_integer(ToSeq)
 ->
-    delete_doc(Index, DocId, UpdateSeq, false).
+    delete_doc(Index, DocId, FromSeq, ToSeq, false).
 
-purge_doc(#index{} = Index, DocId, PurgeSeq) when
-    is_binary(DocId), is_integer(PurgeSeq)
+purge_doc(#index{} = Index, DocId, FromSeq, ToSeq) when
+    is_binary(DocId), is_integer(FromSeq), is_integer(ToSeq)
 ->
-    delete_doc(Index, DocId, PurgeSeq, true).
+    delete_doc(Index, DocId, FromSeq, ToSeq, true).
 
-delete_doc(#index{} = Index, DocId, Seq, IsPurge) when
-    is_binary(DocId), is_integer(Seq), is_boolean(IsPurge)
+delete_doc(#index{} = Index, DocId, FromSeq, ToSeq, IsPurge) when
+    is_binary(DocId), is_integer(FromSeq), is_integer(ToSeq), 
is_boolean(IsPurge)
 ->
-    ReqBody = #{seq => Seq, purge => IsPurge},
+    ReqBody = #{from_seq => FromSeq, to_seq => ToSeq, purge => IsPurge},
     Resp = send_if_enabled(
-        doc_url(Index, DocId), [?JSON_CONTENT_TYPE], delete, 
jiffy:encode(ReqBody)
+        doc_url(Index, DocId),
+        [?JSON_CONTENT_TYPE],
+        delete,
+        jiffy:encode(ReqBody),
+        [
+            {stream_to, self()}
+        ]
     ),
     case Resp of
-        {ok, "204", _, _} ->
-            ok;
-        {ok, StatusCode, _, RespBody} ->
-            {error, jaxrs_error(StatusCode, RespBody)};
+        {ibrowse_req_id, ReqId} ->
+            {ok, ReqId};
         {error, Reason} ->
             send_error(Reason)
     end.
 
-update_doc(#index{} = Index, DocId, UpdateSeq, Partition, Fields) when
+update_doc(#index{} = Index, DocId, FromSeq, ToSeq, Partition, Fields) when
     is_binary(DocId),
-    is_integer(UpdateSeq),
+    is_integer(FromSeq),
+    is_integer(ToSeq),
     (is_binary(Partition) orelse Partition == null),
     is_list(Fields)
 ->
     ReqBody = #{
-        seq => UpdateSeq,
+        from_seq => FromSeq,
+        to_seq => ToSeq,
         partition => Partition,
         fields => Fields
     },
     Resp = send_if_enabled(
-        doc_url(Index, DocId), [?JSON_CONTENT_TYPE], put, jiffy:encode(ReqBody)
+        doc_url(Index, DocId),
+        [?JSON_CONTENT_TYPE],
+        put,
+        jiffy:encode(ReqBody),
+        [
+            {stream_to, self()}
+        ]
     ),
     case Resp of
-        {ok, "204", _, _} ->
-            ok;
-        {ok, StatusCode, _, RespBody} ->
-            {error, jaxrs_error(StatusCode, RespBody)};
+        {ibrowse_req_id, ReqId} ->
+            {ok, ReqId};
         {error, Reason} ->
             send_error(Reason)
     end.
@@ -160,15 +170,20 @@ search(#index{} = Index, QueryArgs) ->
             send_error(Reason)
     end.
 
-set_update_seq(#index{} = Index, UpdateSeq) ->
-    set_seq(Index, update_seq, UpdateSeq).
-set_purge_seq(#index{} = Index, PurgeSeq) ->
-    set_seq(Index, purge_seq, PurgeSeq).
-
-set_seq(#index{} = Index, Key, Value) when is_atom(Key), is_integer(Value) ->
+set_update_seq(#index{} = Index, FromSeq, ToSeq) ->
     ReqBody = #{
-        Key => Value
+        from_update_seq => FromSeq,
+        to_update_seq => ToSeq
     },
+    set_seq(Index, ReqBody).
+set_purge_seq(#index{} = Index, FromSeq, ToSeq) ->
+    ReqBody = #{
+        from_purge_seq => FromSeq,
+        to_purge_seq => ToSeq
+    },
+    set_seq(Index, ReqBody).
+
+set_seq(#index{} = Index, ReqBody) when is_map(ReqBody) ->
     Resp = send_if_enabled(
         index_url(Index), [?JSON_CONTENT_TYPE], post, jiffy:encode(ReqBody)
     ),
@@ -226,6 +241,8 @@ jaxrs_error("404", Body) ->
     {not_found, message(Body)};
 jaxrs_error("405", Body) ->
     {method_not_allowed, message(Body)};
+jaxrs_error("409", Body) ->
+    {conflict, message(Body)};
 jaxrs_error("417", Body) ->
     {expectation_failed, message(Body)};
 jaxrs_error("422", Body) ->
@@ -249,10 +266,13 @@ errors(Body) ->
 send_if_enabled(Url, Header, Method) ->
     send_if_enabled(Url, Header, Method, []).
 
-send_if_enabled(Url, Header, Method, Body) ->
+send_if_enabled(Url, Headers, Method, Body) ->
+    send_if_enabled(Url, Headers, Method, Body, []).
+
+send_if_enabled(Url, Headers, Method, Body, Options) ->
     case nouveau:enabled() of
         true ->
-            ibrowse:send_req_direct(conn_pid(), Url, Header, Method, Body);
+            ibrowse:send_req_direct(conn_pid(), Url, Headers, Method, Body, 
Options);
         false ->
             {error, nouveau_not_enabled}
     end.
@@ -265,6 +285,11 @@ conn_pid() ->
             erlang:put(?NOUVEAU_CONN_PID, Pid),
             Pid;
         Pid when is_pid(Pid) ->
-            true = is_process_alive(Pid),
-            Pid
+            case is_process_alive(Pid) of
+                true ->
+                    Pid;
+                false ->
+                    erlang:erase(?NOUVEAU_CONN_PID),
+                    conn_pid()
+            end
     end.
diff --git a/src/nouveau/src/nouveau_index_updater.erl 
b/src/nouveau/src/nouveau_index_updater.erl
index 33134bc96..a53db9d7d 100644
--- a/src/nouveau/src/nouveau_index_updater.erl
+++ b/src/nouveau/src/nouveau_index_updater.erl
@@ -26,6 +26,17 @@
 -import(couch_query_servers, [get_os_process/1, ret_os_process/1, 
proc_prompt/2]).
 -import(nouveau_util, [index_path/1]).
 
+-record(acc, {
+    db,
+    index,
+    proc,
+    changes_done,
+    total_changes,
+    exclude_idrevs,
+    req_ids,
+    from_seq
+}).
+
 outdated(#index{} = Index) ->
     case open_or_create_index(Index) of
         {ok, #{} = Info} ->
@@ -66,9 +77,20 @@ update(#index{} = Index) ->
                 Proc = get_os_process(Index#index.def_lang),
                 try
                     true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, 
<<"nouveau">>]),
-                    Acc0 = {Db, Index, Proc, 0, TotalChanges, ExcludeIdRevs},
-                    {ok, _} = couch_db:fold_changes(Db, IndexUpdateSeq, fun 
load_docs/2, Acc0, []),
-                    ok = nouveau_api:set_update_seq(Index, NewCurSeq)
+                    Acc0 = #acc{
+                        db = Db,
+                        index = Index,
+                        proc = Proc,
+                        changes_done = 0,
+                        total_changes = TotalChanges,
+                        exclude_idrevs = ExcludeIdRevs,
+                        req_ids = [],
+                        from_seq = IndexUpdateSeq
+                    },
+                    {ok, #acc{from_seq = FromSeq, req_ids = ReqIds}} =
+                        couch_db:fold_changes(Db, IndexUpdateSeq, fun 
load_docs/2, Acc0, []),
+                    [] = flush_reqids(ReqIds),
+                    ok = nouveau_api:set_update_seq(Index, FromSeq, NewCurSeq)
                 after
                     ret_os_process(Proc)
                 end
@@ -79,23 +101,70 @@ update(#index{} = Index) ->
 
 load_docs(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) ->
     {ok, Acc};
-load_docs(FDI, {Db, Index, Proc, ChangesDone, TotalChanges, ExcludeIdRevs}) ->
+load_docs(
+    FDI,
+    #acc{
+        db = Db,
+        index = Index,
+        proc = Proc,
+        changes_done = ChangesDone,
+        total_changes = TotalChanges,
+        exclude_idrevs = ExcludeIdRevs,
+        req_ids = ReqIds,
+        from_seq = FromSeq
+    } = Acc
+) when
+    length(ReqIds) < 100
+->
     couch_task_status:update([
         {changes_done, ChangesDone}, {progress, (ChangesDone * 100) div 
TotalChanges}
     ]),
     DI = couch_doc:to_doc_info(FDI),
     #doc_info{id = Id, revs = [#rev_info{rev = Rev} | _]} = DI,
     case lists:member({Id, Rev}, ExcludeIdRevs) of
-        true -> ok;
-        false -> update_or_delete_index(Db, Index, DI, Proc)
-    end,
-    {ok, {Db, Index, Proc, ChangesDone + 1, TotalChanges, ExcludeIdRevs}}.
+        true ->
+            {ok, Acc#acc{changes_done = ChangesDone + 1}};
+        false ->
+            {ReqId, NewFromSeq} = update_or_delete_index(Db, Index, FromSeq, 
DI, Proc),
+            {ok, Acc#acc{
+                changes_done = ChangesDone + 1, from_seq = NewFromSeq, req_ids 
= [ReqId | ReqIds]
+            }}
+    end;
+load_docs(FDI, Acc) ->
+    ReqIds1 = flush_reqids(lists:reverse(Acc#acc.req_ids)),
+    load_docs(
+        FDI, Acc#acc{req_ids = ReqIds1}
+    ).
 
-update_or_delete_index(Db, #index{} = Index, #doc_info{} = DI, Proc) ->
+flush_reqids([]) ->
+    [];
+flush_reqids(ReqIds) ->
+    receive
+        {ibrowse_async_headers, ReqId, Code, _Headers} when Code == "200"; 
Code == "201" ->
+            ok = flush_reqid(ReqId),
+            flush_reqids(lists:delete(ReqId, ReqIds));
+        {ibrowse_async_headers, _ReqId, Code, _Headers} ->
+            exit({error, Code})
+    end.
+
+flush_reqid(ReqId) ->
+    receive
+        {ibrowse_async_response, ReqId, _Body} ->
+            flush_reqid(ReqId);
+        {ibrowse_async_response_end, ReqId} ->
+            ok
+    end.
+
+update_or_delete_index(Db, #index{} = Index, IfMatchSeq, #doc_info{} = DI, 
Proc) ->
     #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} 
= DI,
     case Del of
         true ->
-            ok = nouveau_api:delete_doc(Index, Id, Seq);
+            case nouveau_api:delete_doc(Index, Id, IfMatchSeq, Seq) of
+                {ok, ReqId} ->
+                    {ReqId, Seq};
+                {error, Reason} ->
+                    exit({error, Reason})
+            end;
         false ->
             {ok, Doc} = couch_db:open_doc(Db, DI, []),
             Json = couch_doc:to_json_obj(Doc, []),
@@ -109,11 +178,16 @@ update_or_delete_index(Db, #index{} = Index, #doc_info{} 
= DI, Proc) ->
                 end,
             case Fields of
                 [] ->
-                    ok = nouveau_api:delete_doc(Index, Id, Seq);
+                    case nouveau_api:delete_doc(Index, Id, IfMatchSeq, Seq) of
+                        {ok, ReqId} ->
+                            {ReqId, Seq};
+                        {error, Reason} ->
+                            exit({error, Reason})
+                    end;
                 _ ->
-                    case nouveau_api:update_doc(Index, Id, Seq, Partition, 
Fields) of
-                        ok ->
-                            ok;
+                    case nouveau_api:update_doc(Index, Id, IfMatchSeq, Seq, 
Partition, Fields) of
+                        {ok, ReqId} ->
+                            {ReqId, Seq};
                         {error, Reason} ->
                             exit({error, Reason})
                     end
@@ -177,7 +251,8 @@ purge_index(Db, Index, IndexPurgeSeq) ->
                             true ->
                                 Acc;
                             false ->
-                                update_or_delete_index(Db, Index, DI, Proc),
+                                %% TODO
+                                update_or_delete_index(Db, Index, 0, DI, Proc),
                                 [{Id, Rev} | Acc]
                         end
                 end,
@@ -188,7 +263,7 @@ purge_index(Db, Index, IndexPurgeSeq) ->
         {ok, {ExcludeList, NewPurgeSeq}} = couch_db:fold_purge_infos(
             Db, IndexPurgeSeq, FoldFun, {[], 0}, []
         ),
-        nouveau_api:set_purge_seq(Index, NewPurgeSeq),
+        nouveau_api:set_purge_seq(Index, IndexPurgeSeq, NewPurgeSeq),
         update_local_doc(Db, Index, NewPurgeSeq),
         {ok, ExcludeList}
     after


Reply via email to