This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch nouveau-ibrowse-improvements in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit f7d7c2611e7a3284c9e0d691e7295350c1bd8933 Author: Robert Newson <[email protected]> AuthorDate: Mon Sep 4 23:38:45 2023 +0100 Ensure an update fails if a previous update failed As we are pipelining requests we must ensure that all requests after a failed request also fail so that the index is never in an inconsistent state. To do this we pass the expected 'match sequence' with every request. nouveau returns a 409 Conflict if this doesn't match the index's current sequence. --- .../couchdb/nouveau/api/DocumentDeleteRequest.java | 20 +++++++- .../couchdb/nouveau/api/DocumentUpdateRequest.java | 13 +++++- .../couchdb/nouveau/api/IndexInfoRequest.java | 19 +++++++- .../org/apache/couchdb/nouveau/core/Index.java | 44 ++++++++++-------- .../nouveau/core/UpdatesOutOfOrderException.java | 8 ++-- .../couchdb/nouveau/health/IndexHealthCheck.java | 2 +- .../couchdb/nouveau/resources/IndexResource.java | 13 ++++-- .../couchdb/nouveau/lucene9/Lucene9IndexTest.java | 27 ++++++----- src/nouveau/src/nouveau_api.erl | 48 ++++++++++++------- src/nouveau/src/nouveau_index_updater.erl | 54 +++++++++++++--------- 10 files changed, 168 insertions(+), 80 deletions(-) diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java index 615d60488..82e9b716a 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java @@ -15,22 +15,38 @@ package org.apache.couchdb.nouveau.api; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.Positive; +import jakarta.validation.constraints.PositiveOrZero; public final class DocumentDeleteRequest { + @PositiveOrZero + private final long matchSeq; + @Positive private final long seq; private final boolean purge; - public DocumentDeleteRequest(@JsonProperty("seq") final long seq, @JsonProperty("purge") final boolean purge) { + public DocumentDeleteRequest( + @JsonProperty("match_seq") final long matchSeq, + @JsonProperty("seq") final long seq, + @JsonProperty("purge") final boolean purge) { + if (matchSeq < 0) { + throw new IllegalArgumentException("matchSeq must be 0 or greater"); + } + if (seq < 1) { throw new IllegalArgumentException("seq must be 1 or greater"); } + this.matchSeq = matchSeq; this.seq = seq; this.purge = purge; } + public long getMatchSeq() { + return matchSeq; + } + public long getSeq() { return seq; } @@ -41,6 +57,6 @@ public final class DocumentDeleteRequest { @Override public String toString() { - return "DocumentDeleteRequest [seq=" + seq + ", purge=" + purge + "]"; + return "DocumentDeleteRequest [matchSeq=" + matchSeq + ", seq=" + seq + ", purge=" + purge + "]"; } } diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java index 8b07e7880..82c196602 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java @@ -17,10 +17,14 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.Positive; +import jakarta.validation.constraints.PositiveOrZero; import java.util.Collection; public final class DocumentUpdateRequest { + @PositiveOrZero + private final long matchSeq; + @Positive private final long seq; @@ -31,14 +35,20 @@ public final class DocumentUpdateRequest { private final Collection<Field> fields; public DocumentUpdateRequest( + @JsonProperty("match_seq") final long matchSeq, @JsonProperty("seq") final long seq, @JsonProperty("partition") final String partition, @JsonProperty("fields") final Collection<Field> fields) { + this.matchSeq = matchSeq; this.seq = seq; this.partition = partition; this.fields = fields; } + public long getMatchSeq() { + return matchSeq; + } + public long getSeq() { return seq; } @@ -57,6 +67,7 @@ public final class DocumentUpdateRequest { @Override public String toString() { - return "DocumentUpdateRequest [seq=" + seq + ", partition=" + partition + ", fields=" + fields + "]"; + return "DocumentUpdateRequest [matchSeq=" + matchSeq + ", seq=" + seq + ", partition=" + partition + ", fields=" + + fields + "]"; } } diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java index 949528259..cc008231c 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java @@ -19,27 +19,44 @@ import java.util.OptionalLong; public final class IndexInfoRequest { + private final OptionalLong matchUpdateSeq; + private final OptionalLong updateSeq; + private final OptionalLong matchPurgeSeq; + private final OptionalLong purgeSeq; public IndexInfoRequest( + @JsonProperty("match_update_seq") @Positive final OptionalLong matchUpdateSeq, @JsonProperty("update_seq") @Positive final OptionalLong updateSeq, + @JsonProperty("match_purge_seq") @Positive final OptionalLong matchPurgeSeq, @JsonProperty("purge_seq") @Positive final OptionalLong purgeSeq) { + this.matchUpdateSeq = matchUpdateSeq; this.updateSeq = updateSeq; + this.matchPurgeSeq = matchPurgeSeq; this.purgeSeq = purgeSeq; } + public OptionalLong getMatchUpdateSeq() { + return matchUpdateSeq; + } + public OptionalLong getUpdateSeq() { return updateSeq; } + public OptionalLong getMatchPurgeSeq() { + return matchPurgeSeq; + } + public OptionalLong getPurgeSeq() { return purgeSeq; } @Override public String toString() { - return "IndexInfoRequest [updateSeq=" + updateSeq + ", purgeSeq=" + purgeSeq + "]"; + return "IndexInfoRequest [matchUpdateSeq=" + matchUpdateSeq + ", updateSeq=" + updateSeq + ", matchPurgeSeq=" + + matchPurgeSeq + ", purgeSeq=" + purgeSeq + "]"; } } diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java index ca0cd69ad..6f0f45504 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java @@ -86,22 +86,22 @@ public abstract class Index implements Closeable { protected abstract long doDiskSize() throws IOException; public final synchronized void update(final String docId, final DocumentUpdateRequest request) throws IOException { - assertUpdateSeqIsLower(request.getSeq()); + assertUpdateSeqProgress(request.getMatchSeq(), request.getSeq()); doUpdate(docId, request); - incrementUpdateSeq(request.getSeq()); + incrementUpdateSeq(request.getMatchSeq(), request.getSeq()); } protected abstract void doUpdate(final String docId, final DocumentUpdateRequest request) throws IOException; public final synchronized void delete(final String docId, final DocumentDeleteRequest request) throws IOException { if (request.isPurge()) { - assertPurgeSeqIsLower(request.getSeq()); + assertPurgeSeqProgress(request.getMatchSeq(), request.getSeq()); doDelete(docId, request); - incrementPurgeSeq(request.getSeq()); + incrementPurgeSeq(request.getMatchSeq(), request.getSeq()); } else { - assertUpdateSeqIsLower(request.getSeq()); + assertUpdateSeqProgress(request.getMatchSeq(), request.getSeq()); doDelete(docId, request); - incrementUpdateSeq(request.getSeq()); + incrementUpdateSeq(request.getMatchSeq(), request.getSeq()); } } @@ -132,23 +132,23 @@ public abstract class Index implements Closeable { protected abstract boolean doCommit(final long updateSeq, final long purgeSeq) throws IOException; - public final synchronized void setUpdateSeq(final long updateSeq) throws IOException { + public final synchronized void setUpdateSeq(final long matchSeq, final long updateSeq) throws IOException { if (updateSeq < this.updateSeq) { throw new WebApplicationException( "update_seq must be equal or greater than current update_seq", Status.BAD_REQUEST); } if (updateSeq > this.updateSeq) { - incrementUpdateSeq(updateSeq); + incrementUpdateSeq(matchSeq, updateSeq); } } - public final synchronized void setPurgeSeq(final long purgeSeq) throws IOException { + public final synchronized void setPurgeSeq(final long matchSeq, final long purgeSeq) throws IOException { if (purgeSeq < this.purgeSeq) { throw new WebApplicationException( "purge_seq must be equal or greater than current purge_seq", Status.BAD_REQUEST); } if (purgeSeq > this.purgeSeq) { - incrementPurgeSeq(purgeSeq); + incrementPurgeSeq(matchSeq, purgeSeq); } } @@ -178,29 +178,37 @@ public abstract class Index implements Closeable { } } - protected final void assertUpdateSeqIsLower(final long updateSeq) throws UpdatesOutOfOrderException { + protected final void assertUpdateSeqProgress(final long matchSeq, final long updateSeq) + throws UpdatesOutOfOrderException { assert Thread.holdsLock(this); + if (matchSeq != this.updateSeq) { + throw new UpdatesOutOfOrderException(this.updateSeq, matchSeq, updateSeq); + } if (!(updateSeq > this.updateSeq)) { - throw new UpdatesOutOfOrderException(this.updateSeq, updateSeq); + throw new UpdatesOutOfOrderException(this.updateSeq, matchSeq, updateSeq); } } - protected final void incrementUpdateSeq(final long updateSeq) throws IOException { + protected final void incrementUpdateSeq(final long matchSeq, final long updateSeq) throws IOException { assert Thread.holdsLock(this); - assertUpdateSeqIsLower(updateSeq); + assertUpdateSeqProgress(matchSeq, updateSeq); this.updateSeq = updateSeq; } - protected final void assertPurgeSeqIsLower(final long purgeSeq) throws UpdatesOutOfOrderException { + protected final void assertPurgeSeqProgress(final long matchSeq, final long purgeSeq) + throws UpdatesOutOfOrderException { assert Thread.holdsLock(this); + if (matchSeq != this.purgeSeq) { + throw new UpdatesOutOfOrderException(this.purgeSeq, matchSeq, purgeSeq); + } if (!(purgeSeq > this.purgeSeq)) { - throw new UpdatesOutOfOrderException(this.purgeSeq, purgeSeq); + throw new UpdatesOutOfOrderException(this.purgeSeq, matchSeq, purgeSeq); } } - protected final void incrementPurgeSeq(final long purgeSeq) throws IOException { + protected final void incrementPurgeSeq(final long matchSeq, final long purgeSeq) throws IOException { assert Thread.holdsLock(this); - assertPurgeSeqIsLower(purgeSeq); + assertPurgeSeqProgress(matchSeq, purgeSeq); this.purgeSeq = purgeSeq; } diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java index acda8beb7..f059f9587 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java @@ -18,11 +18,11 @@ import jakarta.ws.rs.core.Response.Status; public final class UpdatesOutOfOrderException extends WebApplicationException { - public UpdatesOutOfOrderException(final long currentSeq, final long attemptedSeq) { + public UpdatesOutOfOrderException(final long currentSeq, final long matchSeq, final long attemptedSeq) { super( String.format( - "Updates applied in the wrong order (current seq: %d, attempted seq: %d)", - currentSeq, attemptedSeq), - Status.BAD_REQUEST); + "Updates applied in the wrong order (current seq: %d, match seq: %d, attempted seq: %d)", + currentSeq, matchSeq, attemptedSeq), + Status.CONFLICT); } } diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java index 889ab2726..b66bbcde5 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java @@ -42,7 +42,7 @@ public final class IndexHealthCheck extends HealthCheck { indexResource.createIndex(name, new IndexDefinition("standard", null)); try { final DocumentUpdateRequest documentUpdateRequest = - new DocumentUpdateRequest(1, null, Collections.emptyList()); + new DocumentUpdateRequest(0, 1, null, Collections.emptyList()); indexResource.updateDoc(name, "foo", documentUpdateRequest); final SearchRequest searchRequest = new SearchRequest(); diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java index ce99830bb..9c7a100e3 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java @@ -102,11 +102,16 @@ public final class IndexResource { public void setIndexInfo(@PathParam("name") String name, @NotNull @Valid IndexInfoRequest request) throws Exception { indexManager.with(name, indexLoader(), (index) -> { - if (request.getUpdateSeq().isPresent()) { - index.setUpdateSeq(request.getUpdateSeq().getAsLong()); + if (request.getMatchUpdateSeq().isPresent() + && request.getUpdateSeq().isPresent()) { + index.setUpdateSeq( + request.getMatchUpdateSeq().getAsLong(), + request.getUpdateSeq().getAsLong()); } - if (request.getPurgeSeq().isPresent()) { - index.setPurgeSeq(request.getPurgeSeq().getAsLong()); + if (request.getMatchPurgeSeq().isPresent() && request.getPurgeSeq().isPresent()) { + index.setPurgeSeq( + request.getMatchPurgeSeq().getAsLong(), + request.getPurgeSeq().getAsLong()); } return null; }); diff --git a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java index 5d989804b..eaaad1780 100644 --- a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java +++ b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java @@ -72,7 +72,7 @@ public class Lucene9IndexTest { final int count = 100; for (int i = 1; i <= count; i++) { final Collection<Field> fields = List.of(new StringField("foo", "bar", false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); index.update("doc" + i, request); } final SearchRequest request = new SearchRequest(); @@ -91,7 +91,7 @@ public class Lucene9IndexTest { final int count = 100; for (int i = 1; i <= count; i++) { final Collection<Field> fields = List.of(new StringField("foo", "bar", false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); index.update("doc" + i, request); } final SearchRequest request = new SearchRequest(); @@ -111,7 +111,7 @@ public class Lucene9IndexTest { final int count = 100; for (int i = 1; i <= count; i++) { final Collection<Field> fields = List.of(new StringField("bar", "baz", false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); index.update("doc" + i, request); } final SearchRequest request = new SearchRequest(); @@ -131,7 +131,7 @@ public class Lucene9IndexTest { final int count = 100; for (int i = 1; i <= count; i++) { final Collection<Field> fields = List.of(new DoubleField("bar", (double) i, false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); index.update("doc" + i, request); } final SearchRequest request = new SearchRequest(); @@ -154,13 +154,18 @@ public class Lucene9IndexTest { try { final Collection<Field> fields = Collections.emptyList(); + // get match seq wrong + assertThrows( + UpdatesOutOfOrderException.class, + () -> index.update("foo", new DocumentUpdateRequest(1, 2, null, fields))); + // Go to 2. - index.update("foo", new DocumentUpdateRequest(2, null, fields)); + index.update("foo", new DocumentUpdateRequest(0, 2, null, fields)); // Should be prevented from going down to 1. assertThrows( UpdatesOutOfOrderException.class, - () -> index.update("foo", new DocumentUpdateRequest(1, null, fields))); + () -> index.update("foo", new DocumentUpdateRequest(2, 1, null, fields))); } finally { cleanup(index); } @@ -176,7 +181,7 @@ public class Lucene9IndexTest { assertThat(info.getUpdateSeq()).isEqualTo(0); final Collection<Field> fields = List.of(new DoubleField("bar", 12.0, false)); - index.update("foo", new DocumentUpdateRequest(2, null, fields)); + index.update("foo", new DocumentUpdateRequest(0, 2, null, fields)); index.commit(); info = index.info(); @@ -193,13 +198,13 @@ public class Lucene9IndexTest { Index index = setup(path); try { final Collection<Field> fields = List.of(new DoubleField("bar", 12.0, false)); - index.update("foo", new DocumentUpdateRequest(2, null, fields)); + index.update("foo", new DocumentUpdateRequest(0, 2, null, fields)); index.commit(); IndexInfo info = index.info(); assertThat(info.getNumDocs()).isEqualTo(1); - index.delete("foo", new DocumentDeleteRequest(3, false)); + index.delete("foo", new DocumentDeleteRequest(2, 3, false)); index.commit(); info = index.info(); @@ -215,13 +220,13 @@ public class Lucene9IndexTest { Index index = setup(path); try { final Collection<Field> fields = List.of(new DoubleField("bar", 12.0, false)); - index.update("foo", new DocumentUpdateRequest(2, null, fields)); + index.update("foo", new DocumentUpdateRequest(0, 2, null, fields)); index.commit(); IndexInfo info = index.info(); assertThat(info.getNumDocs()).isEqualTo(1); - index.delete("foo", new DocumentDeleteRequest(3, true)); + index.delete("foo", new DocumentDeleteRequest(0, 3, true)); index.commit(); info = index.info(); diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl index f272bafec..0e929a12a 100644 --- a/src/nouveau/src/nouveau_api.erl +++ b/src/nouveau/src/nouveau_api.erl @@ -23,12 +23,12 @@ create_index/2, delete_path/1, delete_path/2, - delete_doc_async/4, - purge_doc/3, - update_doc_async/6, + delete_doc_async/5, + purge_doc/4, + update_doc_async/7, search/2, - set_purge_seq/2, - set_update_seq/2, + set_purge_seq/3, + set_update_seq/3, drain_async_responses/1, jaxrs_error/2 ]). @@ -99,10 +99,15 @@ delete_path(Path, Exclusions) when send_error(Reason) end. -delete_doc_async(ConnPid, #index{} = Index, DocId, UpdateSeq) when - is_pid(ConnPid), is_binary(DocId), is_integer(UpdateSeq) +delete_doc_async(ConnPid, #index{} = Index, DocId, MatchSeq, UpdateSeq) when + is_pid(ConnPid), + is_binary(DocId), + is_integer(MatchSeq), + MatchSeq >= 0, + is_integer(UpdateSeq), + UpdateSeq > 0 -> - ReqBody = #{seq => UpdateSeq, purge => false}, + ReqBody = #{match_seq => MatchSeq, seq => UpdateSeq, purge => false}, send_direct_if_enabled( ConnPid, doc_url(Index, DocId), @@ -114,10 +119,10 @@ delete_doc_async(ConnPid, #index{} = Index, DocId, UpdateSeq) when ] ). -purge_doc(#index{} = Index, DocId, PurgeSeq) when - is_binary(DocId), is_integer(PurgeSeq) +purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when + is_binary(DocId), is_integer(MatchSeq), MatchSeq >= 0, is_integer(PurgeSeq), PurgeSeq > 0 -> - ReqBody = #{seq => PurgeSeq, purge => true}, + ReqBody = #{match_seq => MatchSeq, seq => PurgeSeq, purge => true}, Resp = send_if_enabled( doc_url(Index, DocId), [?JSON_CONTENT_TYPE], delete, jiffy:encode(ReqBody) ), @@ -130,14 +135,18 @@ purge_doc(#index{} = Index, DocId, PurgeSeq) when send_error(Reason) end. -update_doc_async(ConnPid, #index{} = Index, DocId, UpdateSeq, Partition, Fields) when +update_doc_async(ConnPid, #index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) when is_pid(ConnPid), is_binary(DocId), + is_integer(MatchSeq), + MatchSeq >= 0, is_integer(UpdateSeq), + UpdateSeq > 0, (is_binary(Partition) orelse Partition == null), is_list(Fields) -> ReqBody = #{ + match_seq => MatchSeq, seq => UpdateSeq, partition => Partition, fields => Fields @@ -166,13 +175,16 @@ search(#index{} = Index, QueryArgs) -> send_error(Reason) end. -set_update_seq(#index{} = Index, UpdateSeq) -> - set_seq(Index, update_seq, UpdateSeq). -set_purge_seq(#index{} = Index, PurgeSeq) -> - set_seq(Index, purge_seq, PurgeSeq). +set_update_seq(#index{} = Index, MatchSeq, UpdateSeq) -> + set_seq(Index, update_seq, MatchSeq, UpdateSeq). +set_purge_seq(#index{} = Index, MatchSeq, PurgeSeq) -> + set_seq(Index, purge_seq, MatchSeq, PurgeSeq). -set_seq(#index{} = Index, Key, Value) when is_atom(Key), is_integer(Value) -> +set_seq(#index{} = Index, Key, MatchSeq, Value) when + is_atom(Key), is_integer(MatchSeq), is_integer(Value) +-> ReqBody = #{ + match_seq => MatchSeq, Key => Value }, Resp = send_if_enabled( @@ -271,6 +283,8 @@ jaxrs_error("404", Body) -> {not_found, message(Body)}; jaxrs_error("405", Body) -> {method_not_allowed, message(Body)}; +jaxrs_error("409", Body) -> + {conflict, message(Body)}; jaxrs_error("417", Body) -> {expectation_failed, message(Body)}; jaxrs_error("422", Body) -> diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl index d8d75364c..1a046d825 100644 --- a/src/nouveau/src/nouveau_index_updater.erl +++ b/src/nouveau/src/nouveau_index_updater.erl @@ -34,7 +34,8 @@ total_changes, exclude_idrevs, reqids = [], - conn_pid + conn_pid, + match_update_seq }). outdated(#index{} = Index) -> @@ -86,14 +87,15 @@ update(#index{} = Index) -> changes_done = 0, total_changes = TotalChanges, exclude_idrevs = ExcludeIdRevs, - conn_pid = ConnPid + conn_pid = ConnPid, + match_update_seq = IndexUpdateSeq }, {ok, Acc1} = couch_db:fold_changes( Db, IndexUpdateSeq, fun load_docs/2, Acc0, [] ), nouveau_api:drain_async_responses(lists:reverse(Acc1#acc.reqids)), ibrowse:stop_worker_process(ConnPid), - ok = nouveau_api:set_update_seq(Index, NewCurSeq) + ok = nouveau_api:set_update_seq(Index, Acc1#acc.match_update_seq, NewCurSeq) after ret_os_process(Proc) end @@ -123,22 +125,30 @@ load_docs(FDI, #acc{} = Acc0) -> false -> case update_or_delete_index( - Acc1#acc.conn_pid, Acc1#acc.db, Acc1#acc.index, DI, Acc1#acc.proc + Acc1#acc.conn_pid, + Acc1#acc.db, + Acc1#acc.index, + Acc1#acc.match_update_seq, + DI, + Acc1#acc.proc ) of {ibrowse_req_id, ReqId} -> - Acc1#acc{reqids = [ReqId | Acc1#acc.reqids]}; + Acc1#acc{ + match_update_seq = DI#doc_info.high_seq, + reqids = [ReqId | Acc1#acc.reqids] + }; {error, Reason} -> exit({error, Reason}) end end, {ok, Acc2#acc{changes_done = Acc2#acc.changes_done + 1}}. -update_or_delete_index(ConnPid, Db, #index{} = Index, #doc_info{} = DI, Proc) -> +update_or_delete_index(ConnPid, Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) -> #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI, case Del of true -> - nouveau_api:delete_doc_async(ConnPid, Index, Id, Seq); + nouveau_api:delete_doc_async(ConnPid, Index, Id, MatchSeq, Seq); false -> {ok, Doc} = couch_db:open_doc(Db, DI, []), Json = couch_doc:to_json_obj(Doc, []), @@ -152,9 +162,11 @@ update_or_delete_index(ConnPid, Db, #index{} = Index, #doc_info{} = DI, Proc) -> end, case Fields of [] -> - nouveau_api:delete_doc_async(ConnPid, Index, Id, Seq); + nouveau_api:delete_doc_async(ConnPid, Index, Id, MatchSeq, Seq); _ -> - nouveau_api:update_doc_async(ConnPid, Index, Id, Seq, Partition, Fields) + nouveau_api:update_doc_async( + ConnPid, Index, Id, MatchSeq, Seq, Partition, Fields + ) end end. @@ -202,31 +214,31 @@ purge_index(ConnPid, Db, Index, IndexPurgeSeq) -> Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), - FoldFun = fun({PurgeSeq, _UUID, Id, _Revs}, {Acc, _}) -> - Acc0 = + FoldFun = fun({PurgeSeq, _UUID, Id, _Revs}, {ExcludeList0, MatchSeq, _}) -> + ExcludeList1 = case couch_db:get_full_doc_info(Db, Id) of not_found -> - ok = nouveau_api:purge_doc(Index, Id, PurgeSeq), - Acc; + ok = nouveau_api:purge_doc(Index, Id, MatchSeq, PurgeSeq), + ExcludeList0; FDI -> DI = couch_doc:to_doc_info(FDI), #doc_info{id = Id, revs = [#rev_info{rev = Rev} | _]} = DI, - case lists:member({Id, Rev}, Acc) of + case lists:member({Id, Rev}, ExcludeList0) of true -> - Acc; + ExcludeList0; false -> - update_or_delete_index(ConnPid, Db, Index, DI, Proc), - [{Id, Rev} | Acc] + update_or_delete_index(ConnPid, Db, Index, MatchSeq, DI, Proc), + [{Id, Rev} | ExcludeList0] end end, update_task(1), - {ok, {Acc0, PurgeSeq}} + {ok, {ExcludeList1, MatchSeq, PurgeSeq}} end, - {ok, {ExcludeList, NewPurgeSeq}} = couch_db:fold_purge_infos( - Db, IndexPurgeSeq, FoldFun, {[], 0}, [] + {ok, {ExcludeList, MatchSeq, NewPurgeSeq}} = couch_db:fold_purge_infos( + Db, IndexPurgeSeq, FoldFun, {[], IndexPurgeSeq, 0}, [] ), - nouveau_api:set_purge_seq(Index, NewPurgeSeq), + nouveau_api:set_purge_seq(Index, MatchSeq, NewPurgeSeq), update_local_doc(Db, Index, NewPurgeSeq), {ok, ExcludeList} after
