This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch nouveau-ibrowse-improvements in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 6a5f5db728e9d5dee0886970ab254a602ec96259 Author: Robert Newson <[email protected]> AuthorDate: Sat Sep 2 09:24:22 2023 +0100 pipeline requests Send multiple http requests before retrieving the responses for a significant performance gain This required enhancement to the updates out of order check to ensure that a failure of one update request causes all subsequent requests to fail. --- nouveau/nouveau.yaml | 4 +- .../couchdb/nouveau/api/DocumentDeleteRequest.java | 46 +++++---- .../couchdb/nouveau/api/DocumentUpdateRequest.java | 39 +++++--- .../couchdb/nouveau/api/IndexInfoRequest.java | 40 +++++--- .../org/apache/couchdb/nouveau/core/Index.java | 56 +++++------ ...tion.java => IndexUpdateConflictException.java} | 10 +- .../couchdb/nouveau/health/IndexHealthCheck.java | 2 +- .../couchdb/nouveau/resources/IndexResource.java | 13 ++- .../couchdb/nouveau/lucene9/Lucene9IndexTest.java | 26 ++--- src/nouveau/src/nouveau_api.erl | 101 +++++++++++-------- src/nouveau/src/nouveau_index_updater.erl | 107 ++++++++++++++++++--- 11 files changed, 287 insertions(+), 157 deletions(-) diff --git a/nouveau/nouveau.yaml b/nouveau/nouveau.yaml index 095b06bcf..8391277fc 100644 --- a/nouveau/nouveau.yaml +++ b/nouveau/nouveau.yaml @@ -22,6 +22,4 @@ server: - GET - POST requestLog: - appenders: - - type: console - target: stderr + appenders: [] diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java index 99f6f5d9f..828cc7a66 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java @@ -13,43 +13,51 @@ package org.apache.couchdb.nouveau.api; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.PropertyNamingStrategies; -import com.fasterxml.jackson.databind.annotation.JsonNaming; import jakarta.validation.constraints.Positive; +import jakarta.validation.constraints.PositiveOrZero; -@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) -public class DocumentDeleteRequest { +public final class DocumentDeleteRequest { - @Positive - private long seq; + @PositiveOrZero + private final long fromSeq; - private boolean purge; + @Positive + private final long toSeq; - public DocumentDeleteRequest() { - // Jackson deserialization - } + private final boolean purge; - public DocumentDeleteRequest(long seq, final boolean purge) { - if (seq < 1) { - throw new IllegalArgumentException("seq must be 1 or greater"); + @JsonCreator + public DocumentDeleteRequest( + @JsonProperty("from_seq") final long fromSeq, + @JsonProperty("to_seq") final long toSeq, + @JsonProperty("purge") final boolean purge) { + if (fromSeq < 0) { + throw new IllegalArgumentException("fromSeq must be 0 or greater"); } - this.seq = seq; + if (toSeq < 1) { + throw new IllegalArgumentException("toSeq must be 1 or greater"); + } + this.fromSeq = fromSeq; + this.toSeq = toSeq; this.purge = purge; } - @JsonProperty - public long getSeq() { - return seq; + public long getFromSeq() { + return fromSeq; + } + + public long getToSeq() { + return toSeq; } - @JsonProperty public boolean isPurge() { return purge; } @Override public String toString() { - return "DocumentDeleteRequest [seq=" + seq + ", purge=" + purge + "]"; + return "DocumentDeleteRequest [fromSeq=" + fromSeq + ", toSeq=" + toSeq + ", purge=" + purge + "]"; } } diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java index 2fbd01e3b..725dc1803 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java @@ -13,42 +13,51 @@ package org.apache.couchdb.nouveau.api; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.PropertyNamingStrategies; import com.fasterxml.jackson.databind.annotation.JsonNaming; import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.Positive; +import jakarta.validation.constraints.PositiveOrZero; import java.util.Collection; @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) public class DocumentUpdateRequest { + @PositiveOrZero + private final long fromSeq; + @Positive - private long seq; + private final long toSeq; - private String partition; + private final String partition; @NotEmpty @Valid - private Collection<Field> fields; - - public DocumentUpdateRequest() { - // Jackson deserialization - } + private final Collection<Field> fields; - public DocumentUpdateRequest(long seq, String partition, Collection<Field> fields) { - this.seq = seq; + @JsonCreator + public DocumentUpdateRequest( + @JsonProperty("from_seq") final long fromSeq, + @JsonProperty("to_seq") final long toSeq, + @JsonProperty("partition") final String partition, + @JsonProperty("fields") final Collection<Field> fields) { + this.fromSeq = fromSeq; + this.toSeq = toSeq; this.partition = partition; this.fields = fields; } - @JsonProperty - public long getSeq() { - return seq; + public long getFromSeq() { + return fromSeq; + } + + public long getToSeq() { + return toSeq; } - @JsonProperty public String getPartition() { return partition; } @@ -57,13 +66,13 @@ public class DocumentUpdateRequest { return partition != null; } - @JsonProperty public Collection<Field> getFields() { return fields; } @Override public String toString() { - return "DocumentUpdateRequest [seq=" + seq + ", partition=" + partition + ", fields=" + fields + "]"; + return "DocumentUpdateRequest [fromSeq=" + fromSeq + "toSeq=" + toSeq + ", partition=" + partition + ", fields=" + + fields + "]"; } } diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java index 67e806018..55bc051da 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java @@ -15,31 +15,49 @@ package org.apache.couchdb.nouveau.api; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.Positive; +import jakarta.validation.constraints.PositiveOrZero; import java.util.OptionalLong; public final class IndexInfoRequest { - private OptionalLong updateSeq; + private OptionalLong fromUpdateSeq; - private OptionalLong purgeSeq; + private OptionalLong toUpdateSeq; + + private OptionalLong fromPurgeSeq; + + private OptionalLong toPurgeSeq; public IndexInfoRequest( - @JsonProperty("update_seq") @Positive OptionalLong updateSeq, - @JsonProperty("purge_seq") @Positive OptionalLong purgeSeq) { - this.updateSeq = updateSeq; - this.purgeSeq = purgeSeq; + @JsonProperty("from_update_seq") @PositiveOrZero OptionalLong fromUpdateSeq, + @JsonProperty("to_update_seq") @Positive OptionalLong toUpdateSeq, + @JsonProperty("from_purge_seq") @PositiveOrZero OptionalLong fromPurgeSeq, + @JsonProperty("to_purge_seq") @Positive OptionalLong toPurgeSeq) { + this.fromUpdateSeq = fromUpdateSeq; + this.toUpdateSeq = toUpdateSeq; + this.fromPurgeSeq = fromPurgeSeq; + this.toPurgeSeq = toPurgeSeq; + } + + public OptionalLong getFromUpdateSeq() { + return fromUpdateSeq; + } + + public OptionalLong getToUpdateSeq() { + return toUpdateSeq; } - public OptionalLong getUpdateSeq() { - return updateSeq; + public OptionalLong getFromPurgeSeq() { + return fromPurgeSeq; } - public OptionalLong getPurgeSeq() { - return purgeSeq; + public OptionalLong getToPurgeSeq() { + return toPurgeSeq; } @Override public String toString() { - return "IndexInfoRequest [updateSeq=" + updateSeq + ", purgeSeq=" + purgeSeq + "]"; + return "IndexInfoRequest [fromUpdateSeq=" + fromUpdateSeq + ", toUpdateSeq=" + toUpdateSeq + ", fromPurgeSeq=" + + fromPurgeSeq + ", toPurgeSeq=" + toPurgeSeq + "]"; } } diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java index ca0cd69ad..f5ccd89b2 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java @@ -13,8 +13,6 @@ package org.apache.couchdb.nouveau.core; -import jakarta.ws.rs.WebApplicationException; -import jakarta.ws.rs.core.Response.Status; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.Semaphore; @@ -86,22 +84,22 @@ public abstract class Index implements Closeable { protected abstract long doDiskSize() throws IOException; public final synchronized void update(final String docId, final DocumentUpdateRequest request) throws IOException { - assertUpdateSeqIsLower(request.getSeq()); + assertUpdateSeqMatches(request.getFromSeq()); doUpdate(docId, request); - incrementUpdateSeq(request.getSeq()); + incrementUpdateSeq(request.getFromSeq(), request.getToSeq()); } protected abstract void doUpdate(final String docId, final DocumentUpdateRequest request) throws IOException; public final synchronized void delete(final String docId, final DocumentDeleteRequest request) throws IOException { if (request.isPurge()) { - assertPurgeSeqIsLower(request.getSeq()); + assertPurgeSeqMatches(request.getFromSeq()); doDelete(docId, request); - incrementPurgeSeq(request.getSeq()); + incrementPurgeSeq(request.getFromSeq(), request.getToSeq()); } else { - assertUpdateSeqIsLower(request.getSeq()); + assertUpdateSeqMatches(request.getFromSeq()); doDelete(docId, request); - incrementUpdateSeq(request.getSeq()); + incrementUpdateSeq(request.getFromSeq(), request.getToSeq()); } } @@ -132,23 +130,21 @@ public abstract class Index implements Closeable { protected abstract boolean doCommit(final long updateSeq, final long purgeSeq) throws IOException; - public final synchronized void setUpdateSeq(final long updateSeq) throws IOException { - if (updateSeq < this.updateSeq) { - throw new WebApplicationException( - "update_seq must be equal or greater than current update_seq", Status.BAD_REQUEST); + public final synchronized void setUpdateSeq(final long fromSeq, final long toSeq) throws IOException { + if (toSeq < this.updateSeq) { + throw new IndexUpdateConflictException(fromSeq, toSeq); } - if (updateSeq > this.updateSeq) { - incrementUpdateSeq(updateSeq); + if (toSeq > this.updateSeq) { + incrementUpdateSeq(fromSeq, toSeq); } } - public final synchronized void setPurgeSeq(final long purgeSeq) throws IOException { - if (purgeSeq < this.purgeSeq) { - throw new WebApplicationException( - "purge_seq must be equal or greater than current purge_seq", Status.BAD_REQUEST); + public final synchronized void setPurgeSeq(final long fromSeq, final long toSeq) throws IOException { + if (toSeq < this.purgeSeq) { + throw new IndexUpdateConflictException(fromSeq, toSeq); } - if (purgeSeq > this.purgeSeq) { - incrementPurgeSeq(purgeSeq); + if (toSeq > this.purgeSeq) { + incrementPurgeSeq(fromSeq, toSeq); } } @@ -178,29 +174,29 @@ public abstract class Index implements Closeable { } } - protected final void assertUpdateSeqIsLower(final long updateSeq) throws UpdatesOutOfOrderException { + protected final void assertUpdateSeqMatches(final long updateSeq) throws IndexUpdateConflictException { assert Thread.holdsLock(this); - if (!(updateSeq > this.updateSeq)) { - throw new UpdatesOutOfOrderException(this.updateSeq, updateSeq); + if (updateSeq != this.updateSeq) { + throw new IndexUpdateConflictException(this.updateSeq, updateSeq); } } - protected final void incrementUpdateSeq(final long updateSeq) throws IOException { + protected final void incrementUpdateSeq(final long ifMatchSeq, final long updateSeq) throws IOException { assert Thread.holdsLock(this); - assertUpdateSeqIsLower(updateSeq); + assertUpdateSeqMatches(ifMatchSeq); this.updateSeq = updateSeq; } - protected final void assertPurgeSeqIsLower(final long purgeSeq) throws UpdatesOutOfOrderException { + protected final void assertPurgeSeqMatches(final long purgeSeq) throws IndexUpdateConflictException { assert Thread.holdsLock(this); - if (!(purgeSeq > this.purgeSeq)) { - throw new UpdatesOutOfOrderException(this.purgeSeq, purgeSeq); + if (purgeSeq != this.purgeSeq) { + throw new IndexUpdateConflictException(this.purgeSeq, purgeSeq); } } - protected final void incrementPurgeSeq(final long purgeSeq) throws IOException { + protected final void incrementPurgeSeq(final long ifMatchSeq, final long purgeSeq) throws IOException { assert Thread.holdsLock(this); - assertPurgeSeqIsLower(purgeSeq); + assertPurgeSeqMatches(ifMatchSeq); this.purgeSeq = purgeSeq; } diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexUpdateConflictException.java similarity index 63% rename from nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java rename to nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexUpdateConflictException.java index acda8beb7..7003551a6 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexUpdateConflictException.java @@ -16,13 +16,9 @@ package org.apache.couchdb.nouveau.core; import jakarta.ws.rs.WebApplicationException; import jakarta.ws.rs.core.Response.Status; -public final class UpdatesOutOfOrderException extends WebApplicationException { +public final class IndexUpdateConflictException extends WebApplicationException { - public UpdatesOutOfOrderException(final long currentSeq, final long attemptedSeq) { - super( - String.format( - "Updates applied in the wrong order (current seq: %d, attempted seq: %d)", - currentSeq, attemptedSeq), - Status.BAD_REQUEST); + public IndexUpdateConflictException(final long fromSeq, final long toSeq) { + super(String.format("Index update conflict (from seq: %d, to seq: %d)", fromSeq, toSeq), Status.CONFLICT); } } diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java index 889ab2726..b66bbcde5 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java @@ -42,7 +42,7 @@ public final class IndexHealthCheck extends HealthCheck { indexResource.createIndex(name, new IndexDefinition("standard", null)); try { final DocumentUpdateRequest documentUpdateRequest = - new DocumentUpdateRequest(1, null, Collections.emptyList()); + new DocumentUpdateRequest(0, 1, null, Collections.emptyList()); indexResource.updateDoc(name, "foo", documentUpdateRequest); final SearchRequest searchRequest = new SearchRequest(); diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java index ce99830bb..5c6614c21 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java @@ -102,11 +102,16 @@ public final class IndexResource { public void setIndexInfo(@PathParam("name") String name, @NotNull @Valid IndexInfoRequest request) throws Exception { indexManager.with(name, indexLoader(), (index) -> { - if (request.getUpdateSeq().isPresent()) { - index.setUpdateSeq(request.getUpdateSeq().getAsLong()); + if (request.getFromUpdateSeq().isPresent() + && request.getToUpdateSeq().isPresent()) { + index.setUpdateSeq( + request.getFromUpdateSeq().getAsLong(), + request.getToUpdateSeq().getAsLong()); } - if (request.getPurgeSeq().isPresent()) { - index.setPurgeSeq(request.getPurgeSeq().getAsLong()); + if (request.getFromPurgeSeq().isPresent() && request.getToPurgeSeq().isPresent()) { + index.setPurgeSeq( + request.getFromPurgeSeq().getAsLong(), + request.getToPurgeSeq().getAsLong()); } return null; }); diff --git a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java index 5d989804b..f174aad26 100644 --- a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java +++ b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java @@ -34,7 +34,7 @@ import org.apache.couchdb.nouveau.api.SearchResults; import org.apache.couchdb.nouveau.api.StringField; import org.apache.couchdb.nouveau.core.Index; import org.apache.couchdb.nouveau.core.IndexLoader; -import org.apache.couchdb.nouveau.core.UpdatesOutOfOrderException; +import org.apache.couchdb.nouveau.core.IndexUpdateConflictException; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; @@ -72,7 +72,7 @@ public class Lucene9IndexTest { final int count = 100; for (int i = 1; i <= count; i++) { final Collection<Field> fields = List.of(new StringField("foo", "bar", false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); index.update("doc" + i, request); } final SearchRequest request = new SearchRequest(); @@ -91,7 +91,7 @@ public class Lucene9IndexTest { final int count = 100; for (int i = 1; i <= count; i++) { final Collection<Field> fields = List.of(new StringField("foo", "bar", false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); index.update("doc" + i, request); } final SearchRequest request = new SearchRequest(); @@ -111,7 +111,7 @@ public class Lucene9IndexTest { final int count = 100; for (int i = 1; i <= count; i++) { final Collection<Field> fields = List.of(new StringField("bar", "baz", false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); index.update("doc" + i, request); } final SearchRequest request = new SearchRequest(); @@ -131,7 +131,7 @@ public class Lucene9IndexTest { final int count = 100; for (int i = 1; i <= count; i++) { final Collection<Field> fields = List.of(new DoubleField("bar", (double) i, false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); index.update("doc" + i, request); } final SearchRequest request = new SearchRequest(); @@ -155,12 +155,12 @@ public class Lucene9IndexTest { final Collection<Field> fields = Collections.emptyList(); // Go to 2. - index.update("foo", new DocumentUpdateRequest(2, null, fields)); + index.update("foo", new DocumentUpdateRequest(0, 2, null, fields)); // Should be prevented from going down to 1. assertThrows( - UpdatesOutOfOrderException.class, - () -> index.update("foo", new DocumentUpdateRequest(1, null, fields))); + IndexUpdateConflictException.class, + () -> index.update("foo", new DocumentUpdateRequest(2, 1, null, fields))); } finally { cleanup(index); } @@ -176,7 +176,7 @@ public class Lucene9IndexTest { assertThat(info.getUpdateSeq()).isEqualTo(0); final Collection<Field> fields = List.of(new DoubleField("bar", 12.0, false)); - index.update("foo", new DocumentUpdateRequest(2, null, fields)); + index.update("foo", new DocumentUpdateRequest(0, 2, null, fields)); index.commit(); info = index.info(); @@ -193,13 +193,13 @@ public class Lucene9IndexTest { Index index = setup(path); try { final Collection<Field> fields = List.of(new DoubleField("bar", 12.0, false)); - index.update("foo", new DocumentUpdateRequest(2, null, fields)); + index.update("foo", new DocumentUpdateRequest(0, 2, null, fields)); index.commit(); IndexInfo info = index.info(); assertThat(info.getNumDocs()).isEqualTo(1); - index.delete("foo", new DocumentDeleteRequest(3, false)); + index.delete("foo", new DocumentDeleteRequest(2, 3, false)); index.commit(); info = index.info(); @@ -215,13 +215,13 @@ public class Lucene9IndexTest { Index index = setup(path); try { final Collection<Field> fields = List.of(new DoubleField("bar", 12.0, false)); - index.update("foo", new DocumentUpdateRequest(2, null, fields)); + index.update("foo", new DocumentUpdateRequest(0, 2, null, fields)); index.commit(); IndexInfo info = index.info(); assertThat(info.getNumDocs()).isEqualTo(1); - index.delete("foo", new DocumentDeleteRequest(3, true)); + index.delete("foo", new DocumentDeleteRequest(2, 3, true)); index.commit(); info = index.info(); diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl index 6f98cf032..c039a44fd 100644 --- a/src/nouveau/src/nouveau_api.erl +++ b/src/nouveau/src/nouveau_api.erl @@ -23,12 +23,12 @@ create_index/2, delete_path/1, delete_path/2, - delete_doc/3, - purge_doc/3, - update_doc/5, + delete_doc/4, + purge_doc/4, + update_doc/6, search/2, - set_purge_seq/2, - set_update_seq/2 + set_purge_seq/3, + set_update_seq/3 ]). -define(JSON_CONTENT_TYPE, {"Content-Type", "application/json"}). @@ -98,51 +98,61 @@ delete_path(Path, Exclusions) when send_error(Reason) end. -delete_doc(#index{} = Index, DocId, UpdateSeq) when - is_binary(DocId), is_integer(UpdateSeq) +delete_doc(#index{} = Index, DocId, FromSeq, ToSeq) when + is_binary(DocId), is_integer(FromSeq), is_integer(ToSeq) -> - delete_doc(Index, DocId, UpdateSeq, false). + delete_doc(Index, DocId, FromSeq, ToSeq, false). -purge_doc(#index{} = Index, DocId, PurgeSeq) when - is_binary(DocId), is_integer(PurgeSeq) +purge_doc(#index{} = Index, DocId, FromSeq, ToSeq) when + is_binary(DocId), is_integer(FromSeq), is_integer(ToSeq) -> - delete_doc(Index, DocId, PurgeSeq, true). + delete_doc(Index, DocId, FromSeq, ToSeq, true). -delete_doc(#index{} = Index, DocId, Seq, IsPurge) when - is_binary(DocId), is_integer(Seq), is_boolean(IsPurge) +delete_doc(#index{} = Index, DocId, FromSeq, ToSeq, IsPurge) when + is_binary(DocId), is_integer(FromSeq), is_integer(ToSeq), is_boolean(IsPurge) -> - ReqBody = #{seq => Seq, purge => IsPurge}, + ReqBody = #{from_seq => FromSeq, to_seq => ToSeq, purge => IsPurge}, Resp = send_if_enabled( - doc_url(Index, DocId), [?JSON_CONTENT_TYPE], delete, jiffy:encode(ReqBody) + doc_url(Index, DocId), + [?JSON_CONTENT_TYPE], + delete, + jiffy:encode(ReqBody), + [ + {stream_to, self()} + ] ), case Resp of - {ok, "204", _, _} -> - ok; - {ok, StatusCode, _, RespBody} -> - {error, jaxrs_error(StatusCode, RespBody)}; + {ibrowse_req_id, ReqId} -> + {ok, ReqId}; {error, Reason} -> send_error(Reason) end. -update_doc(#index{} = Index, DocId, UpdateSeq, Partition, Fields) when +update_doc(#index{} = Index, DocId, FromSeq, ToSeq, Partition, Fields) when is_binary(DocId), - is_integer(UpdateSeq), + is_integer(FromSeq), + is_integer(ToSeq), (is_binary(Partition) orelse Partition == null), is_list(Fields) -> ReqBody = #{ - seq => UpdateSeq, + from_seq => FromSeq, + to_seq => ToSeq, partition => Partition, fields => Fields }, Resp = send_if_enabled( - doc_url(Index, DocId), [?JSON_CONTENT_TYPE], put, jiffy:encode(ReqBody) + doc_url(Index, DocId), + [?JSON_CONTENT_TYPE], + put, + jiffy:encode(ReqBody), + [ + {stream_to, self()} + ] ), case Resp of - {ok, "204", _, _} -> - ok; - {ok, StatusCode, _, RespBody} -> - {error, jaxrs_error(StatusCode, RespBody)}; + {ibrowse_req_id, ReqId} -> + {ok, ReqId}; {error, Reason} -> send_error(Reason) end. @@ -160,15 +170,20 @@ search(#index{} = Index, QueryArgs) -> send_error(Reason) end. -set_update_seq(#index{} = Index, UpdateSeq) -> - set_seq(Index, update_seq, UpdateSeq). -set_purge_seq(#index{} = Index, PurgeSeq) -> - set_seq(Index, purge_seq, PurgeSeq). - -set_seq(#index{} = Index, Key, Value) when is_atom(Key), is_integer(Value) -> +set_update_seq(#index{} = Index, FromSeq, ToSeq) -> ReqBody = #{ - Key => Value + from_update_seq => FromSeq, + to_update_seq => ToSeq }, + set_seq(Index, ReqBody). +set_purge_seq(#index{} = Index, FromSeq, ToSeq) -> + ReqBody = #{ + from_purge_seq => FromSeq, + to_purge_seq => ToSeq + }, + set_seq(Index, ReqBody). + +set_seq(#index{} = Index, ReqBody) when is_map(ReqBody) -> Resp = send_if_enabled( index_url(Index), [?JSON_CONTENT_TYPE], post, jiffy:encode(ReqBody) ), @@ -226,6 +241,8 @@ jaxrs_error("404", Body) -> {not_found, message(Body)}; jaxrs_error("405", Body) -> {method_not_allowed, message(Body)}; +jaxrs_error("409", Body) -> + {conflict, message(Body)}; jaxrs_error("417", Body) -> {expectation_failed, message(Body)}; jaxrs_error("422", Body) -> @@ -249,10 +266,13 @@ errors(Body) -> send_if_enabled(Url, Header, Method) -> send_if_enabled(Url, Header, Method, []). -send_if_enabled(Url, Header, Method, Body) -> +send_if_enabled(Url, Headers, Method, Body) -> + send_if_enabled(Url, Headers, Method, Body, []). + +send_if_enabled(Url, Headers, Method, Body, Options) -> case nouveau:enabled() of true -> - ibrowse:send_req_direct(conn_pid(), Url, Header, Method, Body); + ibrowse:send_req_direct(conn_pid(), Url, Headers, Method, Body, Options); false -> {error, nouveau_not_enabled} end. @@ -265,6 +285,11 @@ conn_pid() -> erlang:put(?NOUVEAU_CONN_PID, Pid), Pid; Pid when is_pid(Pid) -> - true = is_process_alive(Pid), - Pid + case is_process_alive(Pid) of + true -> + Pid; + false -> + erlang:erase(?NOUVEAU_CONN_PID), + conn_pid() + end end. diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl index 33134bc96..a53db9d7d 100644 --- a/src/nouveau/src/nouveau_index_updater.erl +++ b/src/nouveau/src/nouveau_index_updater.erl @@ -26,6 +26,17 @@ -import(couch_query_servers, [get_os_process/1, ret_os_process/1, proc_prompt/2]). -import(nouveau_util, [index_path/1]). +-record(acc, { + db, + index, + proc, + changes_done, + total_changes, + exclude_idrevs, + req_ids, + from_seq +}). + outdated(#index{} = Index) -> case open_or_create_index(Index) of {ok, #{} = Info} -> @@ -66,9 +77,20 @@ update(#index{} = Index) -> Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), - Acc0 = {Db, Index, Proc, 0, TotalChanges, ExcludeIdRevs}, - {ok, _} = couch_db:fold_changes(Db, IndexUpdateSeq, fun load_docs/2, Acc0, []), - ok = nouveau_api:set_update_seq(Index, NewCurSeq) + Acc0 = #acc{ + db = Db, + index = Index, + proc = Proc, + changes_done = 0, + total_changes = TotalChanges, + exclude_idrevs = ExcludeIdRevs, + req_ids = [], + from_seq = IndexUpdateSeq + }, + {ok, #acc{from_seq = FromSeq, req_ids = ReqIds}} = + couch_db:fold_changes(Db, IndexUpdateSeq, fun load_docs/2, Acc0, []), + [] = flush_reqids(ReqIds), + ok = nouveau_api:set_update_seq(Index, FromSeq, NewCurSeq) after ret_os_process(Proc) end @@ -79,23 +101,70 @@ update(#index{} = Index) -> load_docs(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) -> {ok, Acc}; -load_docs(FDI, {Db, Index, Proc, ChangesDone, TotalChanges, ExcludeIdRevs}) -> +load_docs( + FDI, + #acc{ + db = Db, + index = Index, + proc = Proc, + changes_done = ChangesDone, + total_changes = TotalChanges, + exclude_idrevs = ExcludeIdRevs, + req_ids = ReqIds, + from_seq = FromSeq + } = Acc +) when + length(ReqIds) < 100 +-> couch_task_status:update([ {changes_done, ChangesDone}, {progress, (ChangesDone * 100) div TotalChanges} ]), DI = couch_doc:to_doc_info(FDI), #doc_info{id = Id, revs = [#rev_info{rev = Rev} | _]} = DI, case lists:member({Id, Rev}, ExcludeIdRevs) of - true -> ok; - false -> update_or_delete_index(Db, Index, DI, Proc) - end, - {ok, {Db, Index, Proc, ChangesDone + 1, TotalChanges, ExcludeIdRevs}}. + true -> + {ok, Acc#acc{changes_done = ChangesDone + 1}}; + false -> + {ReqId, NewFromSeq} = update_or_delete_index(Db, Index, FromSeq, DI, Proc), + {ok, Acc#acc{ + changes_done = ChangesDone + 1, from_seq = NewFromSeq, req_ids = [ReqId | ReqIds] + }} + end; +load_docs(FDI, Acc) -> + ReqIds1 = flush_reqids(lists:reverse(Acc#acc.req_ids)), + load_docs( + FDI, Acc#acc{req_ids = ReqIds1} + ). -update_or_delete_index(Db, #index{} = Index, #doc_info{} = DI, Proc) -> +flush_reqids([]) -> + []; +flush_reqids(ReqIds) -> + receive + {ibrowse_async_headers, ReqId, Code, _Headers} when Code == "200"; Code == "201" -> + ok = flush_reqid(ReqId), + flush_reqids(lists:delete(ReqId, ReqIds)); + {ibrowse_async_headers, _ReqId, Code, _Headers} -> + exit({error, Code}) + end. + +flush_reqid(ReqId) -> + receive + {ibrowse_async_response, ReqId, _Body} -> + flush_reqid(ReqId); + {ibrowse_async_response_end, ReqId} -> + ok + end. + +update_or_delete_index(Db, #index{} = Index, IfMatchSeq, #doc_info{} = DI, Proc) -> #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI, case Del of true -> - ok = nouveau_api:delete_doc(Index, Id, Seq); + case nouveau_api:delete_doc(Index, Id, IfMatchSeq, Seq) of + {ok, ReqId} -> + {ReqId, Seq}; + {error, Reason} -> + exit({error, Reason}) + end; false -> {ok, Doc} = couch_db:open_doc(Db, DI, []), Json = couch_doc:to_json_obj(Doc, []), @@ -109,11 +178,16 @@ update_or_delete_index(Db, #index{} = Index, #doc_info{} = DI, Proc) -> end, case Fields of [] -> - ok = nouveau_api:delete_doc(Index, Id, Seq); + case nouveau_api:delete_doc(Index, Id, IfMatchSeq, Seq) of + {ok, ReqId} -> + {ReqId, Seq}; + {error, Reason} -> + exit({error, Reason}) + end; _ -> - case nouveau_api:update_doc(Index, Id, Seq, Partition, Fields) of - ok -> - ok; + case nouveau_api:update_doc(Index, Id, IfMatchSeq, Seq, Partition, Fields) of + {ok, ReqId} -> + {ReqId, Seq}; {error, Reason} -> exit({error, Reason}) end @@ -177,7 +251,8 @@ purge_index(Db, Index, IndexPurgeSeq) -> true -> Acc; false -> - update_or_delete_index(Db, Index, DI, Proc), + %% TODO + update_or_delete_index(Db, Index, 0, DI, Proc), [{Id, Rev} | Acc] end end, @@ -188,7 +263,7 @@ purge_index(Db, Index, IndexPurgeSeq) -> {ok, {ExcludeList, NewPurgeSeq}} = couch_db:fold_purge_infos( Db, IndexPurgeSeq, FoldFun, {[], 0}, [] ), - nouveau_api:set_purge_seq(Index, NewPurgeSeq), + nouveau_api:set_purge_seq(Index, IndexPurgeSeq, NewPurgeSeq), update_local_doc(Db, Index, NewPurgeSeq), {ok, ExcludeList} after
