This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch nouveau-ibrowse-improvements
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit f7d7c2611e7a3284c9e0d691e7295350c1bd8933
Author: Robert Newson <[email protected]>
AuthorDate: Mon Sep 4 23:38:45 2023 +0100

    Ensure an update fails if a previous update failed
    
    As we are pipelining requests we must ensure that all requests after a 
failed
    request also fail so that the index is never in an inconsistent state. To do
    this we pass the expected 'match sequence' with every request. nouveau 
returns a
    409 Conflict if this doesn't match the index's current sequence.
---
 .../couchdb/nouveau/api/DocumentDeleteRequest.java | 20 +++++++-
 .../couchdb/nouveau/api/DocumentUpdateRequest.java | 13 +++++-
 .../couchdb/nouveau/api/IndexInfoRequest.java      | 19 +++++++-
 .../org/apache/couchdb/nouveau/core/Index.java     | 44 ++++++++++--------
 .../nouveau/core/UpdatesOutOfOrderException.java   |  8 ++--
 .../couchdb/nouveau/health/IndexHealthCheck.java   |  2 +-
 .../couchdb/nouveau/resources/IndexResource.java   | 13 ++++--
 .../couchdb/nouveau/lucene9/Lucene9IndexTest.java  | 27 ++++++-----
 src/nouveau/src/nouveau_api.erl                    | 48 ++++++++++++-------
 src/nouveau/src/nouveau_index_updater.erl          | 54 +++++++++++++---------
 10 files changed, 168 insertions(+), 80 deletions(-)

diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
index 615d60488..82e9b716a 100644
--- 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
+++ 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
@@ -15,22 +15,38 @@ package org.apache.couchdb.nouveau.api;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import jakarta.validation.constraints.Positive;
+import jakarta.validation.constraints.PositiveOrZero;
 
 public final class DocumentDeleteRequest {
 
+    @PositiveOrZero
+    private final long matchSeq;
+
     @Positive
     private final long seq;
 
     private final boolean purge;
 
-    public DocumentDeleteRequest(@JsonProperty("seq") final long seq, 
@JsonProperty("purge") final boolean purge) {
+    public DocumentDeleteRequest(
+            @JsonProperty("match_seq") final long matchSeq,
+            @JsonProperty("seq") final long seq,
+            @JsonProperty("purge") final boolean purge) {
+        if (matchSeq < 0) {
+            throw new IllegalArgumentException("matchSeq must be 0 or 
greater");
+        }
+
         if (seq < 1) {
             throw new IllegalArgumentException("seq must be 1 or greater");
         }
+        this.matchSeq = matchSeq;
         this.seq = seq;
         this.purge = purge;
     }
 
+    public long getMatchSeq() {
+        return matchSeq;
+    }
+
     public long getSeq() {
         return seq;
     }
@@ -41,6 +57,6 @@ public final class DocumentDeleteRequest {
 
     @Override
     public String toString() {
-        return "DocumentDeleteRequest [seq=" + seq + ", purge=" + purge + "]";
+        return "DocumentDeleteRequest [matchSeq=" + matchSeq + ", seq=" + seq 
+ ", purge=" + purge + "]";
     }
 }
diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java
 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java
index 8b07e7880..82c196602 100644
--- 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java
+++ 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java
@@ -17,10 +17,14 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import jakarta.validation.Valid;
 import jakarta.validation.constraints.NotEmpty;
 import jakarta.validation.constraints.Positive;
+import jakarta.validation.constraints.PositiveOrZero;
 import java.util.Collection;
 
 public final class DocumentUpdateRequest {
 
+    @PositiveOrZero
+    private final long matchSeq;
+
     @Positive
     private final long seq;
 
@@ -31,14 +35,20 @@ public final class DocumentUpdateRequest {
     private final Collection<Field> fields;
 
     public DocumentUpdateRequest(
+            @JsonProperty("match_seq") final long matchSeq,
             @JsonProperty("seq") final long seq,
             @JsonProperty("partition") final String partition,
             @JsonProperty("fields") final Collection<Field> fields) {
+        this.matchSeq = matchSeq;
         this.seq = seq;
         this.partition = partition;
         this.fields = fields;
     }
 
+    public long getMatchSeq() {
+        return matchSeq;
+    }
+
     public long getSeq() {
         return seq;
     }
@@ -57,6 +67,7 @@ public final class DocumentUpdateRequest {
 
     @Override
     public String toString() {
-        return "DocumentUpdateRequest [seq=" + seq + ", partition=" + 
partition + ", fields=" + fields + "]";
+        return "DocumentUpdateRequest [matchSeq=" + matchSeq + ", seq=" + seq 
+ ", partition=" + partition + ", fields="
+                + fields + "]";
     }
 }
diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java
index 949528259..cc008231c 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java
@@ -19,27 +19,44 @@ import java.util.OptionalLong;
 
 public final class IndexInfoRequest {
 
+    private final OptionalLong matchUpdateSeq;
+
     private final OptionalLong updateSeq;
 
+    private final OptionalLong matchPurgeSeq;
+
     private final OptionalLong purgeSeq;
 
     public IndexInfoRequest(
+            @JsonProperty("match_update_seq") @Positive final OptionalLong 
matchUpdateSeq,
             @JsonProperty("update_seq") @Positive final OptionalLong updateSeq,
+            @JsonProperty("match_purge_seq") @Positive final OptionalLong 
matchPurgeSeq,
             @JsonProperty("purge_seq") @Positive final OptionalLong purgeSeq) {
+        this.matchUpdateSeq = matchUpdateSeq;
         this.updateSeq = updateSeq;
+        this.matchPurgeSeq = matchPurgeSeq;
         this.purgeSeq = purgeSeq;
     }
 
+    public OptionalLong getMatchUpdateSeq() {
+        return matchUpdateSeq;
+    }
+
     public OptionalLong getUpdateSeq() {
         return updateSeq;
     }
 
+    public OptionalLong getMatchPurgeSeq() {
+        return matchPurgeSeq;
+    }
+
     public OptionalLong getPurgeSeq() {
         return purgeSeq;
     }
 
     @Override
     public String toString() {
-        return "IndexInfoRequest [updateSeq=" + updateSeq + ", purgeSeq=" + 
purgeSeq + "]";
+        return "IndexInfoRequest [matchUpdateSeq=" + matchUpdateSeq + ", 
updateSeq=" + updateSeq + ", matchPurgeSeq="
+                + matchPurgeSeq + ", purgeSeq=" + purgeSeq + "]";
     }
 }
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
index ca0cd69ad..6f0f45504 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
@@ -86,22 +86,22 @@ public abstract class Index implements Closeable {
     protected abstract long doDiskSize() throws IOException;
 
     public final synchronized void update(final String docId, final 
DocumentUpdateRequest request) throws IOException {
-        assertUpdateSeqIsLower(request.getSeq());
+        assertUpdateSeqProgress(request.getMatchSeq(), request.getSeq());
         doUpdate(docId, request);
-        incrementUpdateSeq(request.getSeq());
+        incrementUpdateSeq(request.getMatchSeq(), request.getSeq());
     }
 
     protected abstract void doUpdate(final String docId, final 
DocumentUpdateRequest request) throws IOException;
 
     public final synchronized void delete(final String docId, final 
DocumentDeleteRequest request) throws IOException {
         if (request.isPurge()) {
-            assertPurgeSeqIsLower(request.getSeq());
+            assertPurgeSeqProgress(request.getMatchSeq(), request.getSeq());
             doDelete(docId, request);
-            incrementPurgeSeq(request.getSeq());
+            incrementPurgeSeq(request.getMatchSeq(), request.getSeq());
         } else {
-            assertUpdateSeqIsLower(request.getSeq());
+            assertUpdateSeqProgress(request.getMatchSeq(), request.getSeq());
             doDelete(docId, request);
-            incrementUpdateSeq(request.getSeq());
+            incrementUpdateSeq(request.getMatchSeq(), request.getSeq());
         }
     }
 
@@ -132,23 +132,23 @@ public abstract class Index implements Closeable {
 
     protected abstract boolean doCommit(final long updateSeq, final long 
purgeSeq) throws IOException;
 
-    public final synchronized void setUpdateSeq(final long updateSeq) throws 
IOException {
+    public final synchronized void setUpdateSeq(final long matchSeq, final 
long updateSeq) throws IOException {
         if (updateSeq < this.updateSeq) {
             throw new WebApplicationException(
                     "update_seq must be equal or greater than current 
update_seq", Status.BAD_REQUEST);
         }
         if (updateSeq > this.updateSeq) {
-            incrementUpdateSeq(updateSeq);
+            incrementUpdateSeq(matchSeq, updateSeq);
         }
     }
 
-    public final synchronized void setPurgeSeq(final long purgeSeq) throws 
IOException {
+    public final synchronized void setPurgeSeq(final long matchSeq, final long 
purgeSeq) throws IOException {
         if (purgeSeq < this.purgeSeq) {
             throw new WebApplicationException(
                     "purge_seq must be equal or greater than current 
purge_seq", Status.BAD_REQUEST);
         }
         if (purgeSeq > this.purgeSeq) {
-            incrementPurgeSeq(purgeSeq);
+            incrementPurgeSeq(matchSeq, purgeSeq);
         }
     }
 
@@ -178,29 +178,37 @@ public abstract class Index implements Closeable {
         }
     }
 
-    protected final void assertUpdateSeqIsLower(final long updateSeq) throws 
UpdatesOutOfOrderException {
+    protected final void assertUpdateSeqProgress(final long matchSeq, final 
long updateSeq)
+            throws UpdatesOutOfOrderException {
         assert Thread.holdsLock(this);
+        if (matchSeq != this.updateSeq) {
+            throw new UpdatesOutOfOrderException(this.updateSeq, matchSeq, 
updateSeq);
+        }
         if (!(updateSeq > this.updateSeq)) {
-            throw new UpdatesOutOfOrderException(this.updateSeq, updateSeq);
+            throw new UpdatesOutOfOrderException(this.updateSeq, matchSeq, 
updateSeq);
         }
     }
 
-    protected final void incrementUpdateSeq(final long updateSeq) throws 
IOException {
+    protected final void incrementUpdateSeq(final long matchSeq, final long 
updateSeq) throws IOException {
         assert Thread.holdsLock(this);
-        assertUpdateSeqIsLower(updateSeq);
+        assertUpdateSeqProgress(matchSeq, updateSeq);
         this.updateSeq = updateSeq;
     }
 
-    protected final void assertPurgeSeqIsLower(final long purgeSeq) throws 
UpdatesOutOfOrderException {
+    protected final void assertPurgeSeqProgress(final long matchSeq, final 
long purgeSeq)
+            throws UpdatesOutOfOrderException {
         assert Thread.holdsLock(this);
+        if (matchSeq != this.purgeSeq) {
+            throw new UpdatesOutOfOrderException(this.purgeSeq, matchSeq, 
purgeSeq);
+        }
         if (!(purgeSeq > this.purgeSeq)) {
-            throw new UpdatesOutOfOrderException(this.purgeSeq, purgeSeq);
+            throw new UpdatesOutOfOrderException(this.purgeSeq, matchSeq, 
purgeSeq);
         }
     }
 
-    protected final void incrementPurgeSeq(final long purgeSeq) throws 
IOException {
+    protected final void incrementPurgeSeq(final long matchSeq, final long 
purgeSeq) throws IOException {
         assert Thread.holdsLock(this);
-        assertPurgeSeqIsLower(purgeSeq);
+        assertPurgeSeqProgress(matchSeq, purgeSeq);
         this.purgeSeq = purgeSeq;
     }
 
diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java
 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java
index acda8beb7..f059f9587 100644
--- 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java
+++ 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java
@@ -18,11 +18,11 @@ import jakarta.ws.rs.core.Response.Status;
 
 public final class UpdatesOutOfOrderException extends WebApplicationException {
 
-    public UpdatesOutOfOrderException(final long currentSeq, final long 
attemptedSeq) {
+    public UpdatesOutOfOrderException(final long currentSeq, final long 
matchSeq, final long attemptedSeq) {
         super(
                 String.format(
-                        "Updates applied in the wrong order (current seq: %d, 
attempted seq: %d)",
-                        currentSeq, attemptedSeq),
-                Status.BAD_REQUEST);
+                        "Updates applied in the wrong order (current seq: %d, 
match seq: %d, attempted seq: %d)",
+                        currentSeq, matchSeq, attemptedSeq),
+                Status.CONFLICT);
     }
 }
diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java
index 889ab2726..b66bbcde5 100644
--- 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java
+++ 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java
@@ -42,7 +42,7 @@ public final class IndexHealthCheck extends HealthCheck {
         indexResource.createIndex(name, new IndexDefinition("standard", null));
         try {
             final DocumentUpdateRequest documentUpdateRequest =
-                    new DocumentUpdateRequest(1, null, 
Collections.emptyList());
+                    new DocumentUpdateRequest(0, 1, null, 
Collections.emptyList());
             indexResource.updateDoc(name, "foo", documentUpdateRequest);
 
             final SearchRequest searchRequest = new SearchRequest();
diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
index ce99830bb..9c7a100e3 100644
--- 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
+++ 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
@@ -102,11 +102,16 @@ public final class IndexResource {
     public void setIndexInfo(@PathParam("name") String name, @NotNull @Valid 
IndexInfoRequest request)
             throws Exception {
         indexManager.with(name, indexLoader(), (index) -> {
-            if (request.getUpdateSeq().isPresent()) {
-                index.setUpdateSeq(request.getUpdateSeq().getAsLong());
+            if (request.getMatchUpdateSeq().isPresent()
+                    && request.getUpdateSeq().isPresent()) {
+                index.setUpdateSeq(
+                        request.getMatchUpdateSeq().getAsLong(),
+                        request.getUpdateSeq().getAsLong());
             }
-            if (request.getPurgeSeq().isPresent()) {
-                index.setPurgeSeq(request.getPurgeSeq().getAsLong());
+            if (request.getMatchPurgeSeq().isPresent() && 
request.getPurgeSeq().isPresent()) {
+                index.setPurgeSeq(
+                        request.getMatchPurgeSeq().getAsLong(),
+                        request.getPurgeSeq().getAsLong());
             }
             return null;
         });
diff --git 
a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java
 
b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java
index 5d989804b..eaaad1780 100644
--- 
a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java
+++ 
b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java
@@ -72,7 +72,7 @@ public class Lucene9IndexTest {
             final int count = 100;
             for (int i = 1; i <= count; i++) {
                 final Collection<Field> fields = List.of(new 
StringField("foo", "bar", false));
-                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i, null, fields);
+                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i - 1, i, null, fields);
                 index.update("doc" + i, request);
             }
             final SearchRequest request = new SearchRequest();
@@ -91,7 +91,7 @@ public class Lucene9IndexTest {
             final int count = 100;
             for (int i = 1; i <= count; i++) {
                 final Collection<Field> fields = List.of(new 
StringField("foo", "bar", false));
-                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i, null, fields);
+                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i - 1, i, null, fields);
                 index.update("doc" + i, request);
             }
             final SearchRequest request = new SearchRequest();
@@ -111,7 +111,7 @@ public class Lucene9IndexTest {
             final int count = 100;
             for (int i = 1; i <= count; i++) {
                 final Collection<Field> fields = List.of(new 
StringField("bar", "baz", false));
-                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i, null, fields);
+                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i - 1, i, null, fields);
                 index.update("doc" + i, request);
             }
             final SearchRequest request = new SearchRequest();
@@ -131,7 +131,7 @@ public class Lucene9IndexTest {
             final int count = 100;
             for (int i = 1; i <= count; i++) {
                 final Collection<Field> fields = List.of(new 
DoubleField("bar", (double) i, false));
-                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i, null, fields);
+                final DocumentUpdateRequest request = new 
DocumentUpdateRequest(i - 1, i, null, fields);
                 index.update("doc" + i, request);
             }
             final SearchRequest request = new SearchRequest();
@@ -154,13 +154,18 @@ public class Lucene9IndexTest {
         try {
             final Collection<Field> fields = Collections.emptyList();
 
+            // get match seq wrong
+            assertThrows(
+                    UpdatesOutOfOrderException.class,
+                    () -> index.update("foo", new DocumentUpdateRequest(1, 2, 
null, fields)));
+
             // Go to 2.
-            index.update("foo", new DocumentUpdateRequest(2, null, fields));
+            index.update("foo", new DocumentUpdateRequest(0, 2, null, fields));
 
             // Should be prevented from going down to 1.
             assertThrows(
                     UpdatesOutOfOrderException.class,
-                    () -> index.update("foo", new DocumentUpdateRequest(1, 
null, fields)));
+                    () -> index.update("foo", new DocumentUpdateRequest(2, 1, 
null, fields)));
         } finally {
             cleanup(index);
         }
@@ -176,7 +181,7 @@ public class Lucene9IndexTest {
             assertThat(info.getUpdateSeq()).isEqualTo(0);
 
             final Collection<Field> fields = List.of(new DoubleField("bar", 
12.0, false));
-            index.update("foo", new DocumentUpdateRequest(2, null, fields));
+            index.update("foo", new DocumentUpdateRequest(0, 2, null, fields));
             index.commit();
 
             info = index.info();
@@ -193,13 +198,13 @@ public class Lucene9IndexTest {
         Index index = setup(path);
         try {
             final Collection<Field> fields = List.of(new DoubleField("bar", 
12.0, false));
-            index.update("foo", new DocumentUpdateRequest(2, null, fields));
+            index.update("foo", new DocumentUpdateRequest(0, 2, null, fields));
             index.commit();
 
             IndexInfo info = index.info();
             assertThat(info.getNumDocs()).isEqualTo(1);
 
-            index.delete("foo", new DocumentDeleteRequest(3, false));
+            index.delete("foo", new DocumentDeleteRequest(2, 3, false));
             index.commit();
 
             info = index.info();
@@ -215,13 +220,13 @@ public class Lucene9IndexTest {
         Index index = setup(path);
         try {
             final Collection<Field> fields = List.of(new DoubleField("bar", 
12.0, false));
-            index.update("foo", new DocumentUpdateRequest(2, null, fields));
+            index.update("foo", new DocumentUpdateRequest(0, 2, null, fields));
             index.commit();
 
             IndexInfo info = index.info();
             assertThat(info.getNumDocs()).isEqualTo(1);
 
-            index.delete("foo", new DocumentDeleteRequest(3, true));
+            index.delete("foo", new DocumentDeleteRequest(0, 3, true));
             index.commit();
 
             info = index.info();
diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl
index f272bafec..0e929a12a 100644
--- a/src/nouveau/src/nouveau_api.erl
+++ b/src/nouveau/src/nouveau_api.erl
@@ -23,12 +23,12 @@
     create_index/2,
     delete_path/1,
     delete_path/2,
-    delete_doc_async/4,
-    purge_doc/3,
-    update_doc_async/6,
+    delete_doc_async/5,
+    purge_doc/4,
+    update_doc_async/7,
     search/2,
-    set_purge_seq/2,
-    set_update_seq/2,
+    set_purge_seq/3,
+    set_update_seq/3,
     drain_async_responses/1,
     jaxrs_error/2
 ]).
@@ -99,10 +99,15 @@ delete_path(Path, Exclusions) when
             send_error(Reason)
     end.
 
-delete_doc_async(ConnPid, #index{} = Index, DocId, UpdateSeq) when
-    is_pid(ConnPid), is_binary(DocId), is_integer(UpdateSeq)
+delete_doc_async(ConnPid, #index{} = Index, DocId, MatchSeq, UpdateSeq) when
+    is_pid(ConnPid),
+    is_binary(DocId),
+    is_integer(MatchSeq),
+    MatchSeq >= 0,
+    is_integer(UpdateSeq),
+    UpdateSeq > 0
 ->
-    ReqBody = #{seq => UpdateSeq, purge => false},
+    ReqBody = #{match_seq => MatchSeq, seq => UpdateSeq, purge => false},
     send_direct_if_enabled(
         ConnPid,
         doc_url(Index, DocId),
@@ -114,10 +119,10 @@ delete_doc_async(ConnPid, #index{} = Index, DocId, 
UpdateSeq) when
         ]
     ).
 
-purge_doc(#index{} = Index, DocId, PurgeSeq) when
-    is_binary(DocId), is_integer(PurgeSeq)
+purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when
+    is_binary(DocId), is_integer(MatchSeq), MatchSeq >= 0, 
is_integer(PurgeSeq), PurgeSeq > 0
 ->
-    ReqBody = #{seq => PurgeSeq, purge => true},
+    ReqBody = #{match_seq => MatchSeq, seq => PurgeSeq, purge => true},
     Resp = send_if_enabled(
         doc_url(Index, DocId), [?JSON_CONTENT_TYPE], delete, 
jiffy:encode(ReqBody)
     ),
@@ -130,14 +135,18 @@ purge_doc(#index{} = Index, DocId, PurgeSeq) when
             send_error(Reason)
     end.
 
-update_doc_async(ConnPid, #index{} = Index, DocId, UpdateSeq, Partition, 
Fields) when
+update_doc_async(ConnPid, #index{} = Index, DocId, MatchSeq, UpdateSeq, 
Partition, Fields) when
     is_pid(ConnPid),
     is_binary(DocId),
+    is_integer(MatchSeq),
+    MatchSeq >= 0,
     is_integer(UpdateSeq),
+    UpdateSeq > 0,
     (is_binary(Partition) orelse Partition == null),
     is_list(Fields)
 ->
     ReqBody = #{
+        match_seq => MatchSeq,
         seq => UpdateSeq,
         partition => Partition,
         fields => Fields
@@ -166,13 +175,16 @@ search(#index{} = Index, QueryArgs) ->
             send_error(Reason)
     end.
 
-set_update_seq(#index{} = Index, UpdateSeq) ->
-    set_seq(Index, update_seq, UpdateSeq).
-set_purge_seq(#index{} = Index, PurgeSeq) ->
-    set_seq(Index, purge_seq, PurgeSeq).
+set_update_seq(#index{} = Index, MatchSeq, UpdateSeq) ->
+    set_seq(Index, update_seq, MatchSeq, UpdateSeq).
+set_purge_seq(#index{} = Index, MatchSeq, PurgeSeq) ->
+    set_seq(Index, purge_seq, MatchSeq, PurgeSeq).
 
-set_seq(#index{} = Index, Key, Value) when is_atom(Key), is_integer(Value) ->
+set_seq(#index{} = Index, Key, MatchSeq, Value) when
+    is_atom(Key), is_integer(MatchSeq), is_integer(Value)
+->
     ReqBody = #{
+        match_seq => MatchSeq,
         Key => Value
     },
     Resp = send_if_enabled(
@@ -271,6 +283,8 @@ jaxrs_error("404", Body) ->
     {not_found, message(Body)};
 jaxrs_error("405", Body) ->
     {method_not_allowed, message(Body)};
+jaxrs_error("409", Body) ->
+    {conflict, message(Body)};
 jaxrs_error("417", Body) ->
     {expectation_failed, message(Body)};
 jaxrs_error("422", Body) ->
diff --git a/src/nouveau/src/nouveau_index_updater.erl 
b/src/nouveau/src/nouveau_index_updater.erl
index d8d75364c..1a046d825 100644
--- a/src/nouveau/src/nouveau_index_updater.erl
+++ b/src/nouveau/src/nouveau_index_updater.erl
@@ -34,7 +34,8 @@
     total_changes,
     exclude_idrevs,
     reqids = [],
-    conn_pid
+    conn_pid,
+    match_update_seq
 }).
 
 outdated(#index{} = Index) ->
@@ -86,14 +87,15 @@ update(#index{} = Index) ->
                         changes_done = 0,
                         total_changes = TotalChanges,
                         exclude_idrevs = ExcludeIdRevs,
-                        conn_pid = ConnPid
+                        conn_pid = ConnPid,
+                        match_update_seq = IndexUpdateSeq
                     },
                     {ok, Acc1} = couch_db:fold_changes(
                         Db, IndexUpdateSeq, fun load_docs/2, Acc0, []
                     ),
                     
nouveau_api:drain_async_responses(lists:reverse(Acc1#acc.reqids)),
                     ibrowse:stop_worker_process(ConnPid),
-                    ok = nouveau_api:set_update_seq(Index, NewCurSeq)
+                    ok = nouveau_api:set_update_seq(Index, 
Acc1#acc.match_update_seq, NewCurSeq)
                 after
                     ret_os_process(Proc)
                 end
@@ -123,22 +125,30 @@ load_docs(FDI, #acc{} = Acc0) ->
             false ->
                 case
                     update_or_delete_index(
-                        Acc1#acc.conn_pid, Acc1#acc.db, Acc1#acc.index, DI, 
Acc1#acc.proc
+                        Acc1#acc.conn_pid,
+                        Acc1#acc.db,
+                        Acc1#acc.index,
+                        Acc1#acc.match_update_seq,
+                        DI,
+                        Acc1#acc.proc
                     )
                 of
                     {ibrowse_req_id, ReqId} ->
-                        Acc1#acc{reqids = [ReqId | Acc1#acc.reqids]};
+                        Acc1#acc{
+                            match_update_seq = DI#doc_info.high_seq,
+                            reqids = [ReqId | Acc1#acc.reqids]
+                        };
                     {error, Reason} ->
                         exit({error, Reason})
                 end
         end,
     {ok, Acc2#acc{changes_done = Acc2#acc.changes_done + 1}}.
 
-update_or_delete_index(ConnPid, Db, #index{} = Index, #doc_info{} = DI, Proc) 
->
+update_or_delete_index(ConnPid, Db, #index{} = Index, MatchSeq, #doc_info{} = 
DI, Proc) ->
     #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} 
= DI,
     case Del of
         true ->
-            nouveau_api:delete_doc_async(ConnPid, Index, Id, Seq);
+            nouveau_api:delete_doc_async(ConnPid, Index, Id, MatchSeq, Seq);
         false ->
             {ok, Doc} = couch_db:open_doc(Db, DI, []),
             Json = couch_doc:to_json_obj(Doc, []),
@@ -152,9 +162,11 @@ update_or_delete_index(ConnPid, Db, #index{} = Index, 
#doc_info{} = DI, Proc) ->
                 end,
             case Fields of
                 [] ->
-                    nouveau_api:delete_doc_async(ConnPid, Index, Id, Seq);
+                    nouveau_api:delete_doc_async(ConnPid, Index, Id, MatchSeq, 
Seq);
                 _ ->
-                    nouveau_api:update_doc_async(ConnPid, Index, Id, Seq, 
Partition, Fields)
+                    nouveau_api:update_doc_async(
+                        ConnPid, Index, Id, MatchSeq, Seq, Partition, Fields
+                    )
             end
     end.
 
@@ -202,31 +214,31 @@ purge_index(ConnPid, Db, Index, IndexPurgeSeq) ->
     Proc = get_os_process(Index#index.def_lang),
     try
         true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, 
<<"nouveau">>]),
-        FoldFun = fun({PurgeSeq, _UUID, Id, _Revs}, {Acc, _}) ->
-            Acc0 =
+        FoldFun = fun({PurgeSeq, _UUID, Id, _Revs}, {ExcludeList0, MatchSeq, 
_}) ->
+            ExcludeList1 =
                 case couch_db:get_full_doc_info(Db, Id) of
                     not_found ->
-                        ok = nouveau_api:purge_doc(Index, Id, PurgeSeq),
-                        Acc;
+                        ok = nouveau_api:purge_doc(Index, Id, MatchSeq, 
PurgeSeq),
+                        ExcludeList0;
                     FDI ->
                         DI = couch_doc:to_doc_info(FDI),
                         #doc_info{id = Id, revs = [#rev_info{rev = Rev} | _]} 
= DI,
-                        case lists:member({Id, Rev}, Acc) of
+                        case lists:member({Id, Rev}, ExcludeList0) of
                             true ->
-                                Acc;
+                                ExcludeList0;
                             false ->
-                                update_or_delete_index(ConnPid, Db, Index, DI, 
Proc),
-                                [{Id, Rev} | Acc]
+                                update_or_delete_index(ConnPid, Db, Index, 
MatchSeq, DI, Proc),
+                                [{Id, Rev} | ExcludeList0]
                         end
                 end,
             update_task(1),
-            {ok, {Acc0, PurgeSeq}}
+            {ok, {ExcludeList1, MatchSeq, PurgeSeq}}
         end,
 
-        {ok, {ExcludeList, NewPurgeSeq}} = couch_db:fold_purge_infos(
-            Db, IndexPurgeSeq, FoldFun, {[], 0}, []
+        {ok, {ExcludeList, MatchSeq, NewPurgeSeq}} = couch_db:fold_purge_infos(
+            Db, IndexPurgeSeq, FoldFun, {[], IndexPurgeSeq, 0}, []
         ),
-        nouveau_api:set_purge_seq(Index, NewPurgeSeq),
+        nouveau_api:set_purge_seq(Index, MatchSeq, NewPurgeSeq),
         update_local_doc(Db, Index, NewPurgeSeq),
         {ok, ExcludeList}
     after

Reply via email to