Repository: incubator-gobblin Updated Branches: refs/heads/master 1912eb790 -> ab4fc217d
[GOBBLIN-488] Make aware of records Closes #2357 from zxcware/async Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ab4fc217 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ab4fc217 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ab4fc217 Branch: refs/heads/master Commit: ab4fc217d208d453d3db2eb6f22b4356a8ee94d9 Parents: 1912eb7 Author: zhchen <[email protected]> Authored: Mon May 7 14:34:04 2018 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Mon May 7 14:34:04 2018 -0700 ---------------------------------------------------------------------- .../org/apache/gobblin/async/AsyncRequest.java | 61 +++++++++++++++----- gobblin-docs/sinks/Http.md | 2 + .../gobblin/writer/AsyncHttpWriterTest.java | 7 +++ 3 files changed, 56 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab4fc217/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java b/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java index c0aad92..dbdf66d 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java @@ -20,6 +20,8 @@ package org.apache.gobblin.async; import java.util.ArrayList; import java.util.List; +import com.google.common.collect.ImmutableList; + import lombok.Getter; import lombok.Setter; @@ -34,15 +36,34 @@ import org.apache.gobblin.net.Request; * @param <RQ> type of raw request */ public class AsyncRequest<D, RQ> implements Request<RQ> { - @Getter - private int recordCount = 0; - @Getter - protected long bytesWritten = 0; @Getter @Setter private RQ rawRequest; + private final List<Thunk<D>> thunks = new ArrayList<>(); + + /** + * Get the total number of records processed in the request + */ + public int getRecordCount() { + return thunks.size(); + } + + /** + * Get the total bytes processed in the request + */ + public long getBytesWritten() { + long bytesWritten = 0; + for (Thunk thunk : thunks) { + bytesWritten += thunk.sizeInBytes; + } + return bytesWritten; + } - @Getter - private final List<Thunk> thunks = new ArrayList<>(); + /** + * Get all records information in the request + */ + public List<Thunk<D>> getThunks() { + return ImmutableList.copyOf(thunks); + } /** * Mark the record associated with this request @@ -51,24 +72,36 @@ public class AsyncRequest<D, RQ> implements Request<RQ> { * @param bytesWritten bytes of the record written into the request */ public void markRecord(BufferedRecord<D> record, int bytesWritten) { - if (record.getCallback() != null) { - thunks.add(new Thunk(record.getCallback(), bytesWritten)); - } - recordCount++; - this.bytesWritten += bytesWritten; + thunks.add(new Thunk<>(record, bytesWritten)); } /** - * A helper class which wraps the callback - * It may contain more information related to each individual record + * A descriptor that represents a record in the request */ - public static final class Thunk { + public static final class Thunk<D> { + /** + * @deprecated Use {@link #record} + */ + @Deprecated public final Callback callback; + public final int sizeInBytes; + public final BufferedRecord<D> record; + /** + * @deprecated Use {@link #Thunk(BufferedRecord, int)} + */ + @Deprecated Thunk(Callback callback, int sizeInBytes) { this.callback = callback; this.sizeInBytes = sizeInBytes; + this.record = null; + } + + Thunk(BufferedRecord<D> record, int sizeInBytes) { + this.callback = record.getCallback(); + this.sizeInBytes = sizeInBytes; + this.record = record; } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab4fc217/gobblin-docs/sinks/Http.md ---------------------------------------------------------------------- diff --git a/gobblin-docs/sinks/Http.md b/gobblin-docs/sinks/Http.md index e5c3dbc..8253a63 100644 --- a/gobblin-docs/sinks/Http.md +++ b/gobblin-docs/sinks/Http.md @@ -72,6 +72,7 @@ Configurations for the builder are: |---|---|---| | `gobblin.writer.http.urlTemplate` | Required, the url template(schema and port included), together with `keys` and `queryParams`, to be resolved to request url | ```http://www.test.com/profiles/${memberId}``` | | `gobblin.writer.http.verb` | Required, [http verbs](http://www.restapitutorial.com/lessons/httpmethods.html) | get, update, delete, etc | +| `gobblin.writer.http.errorCodeWhitelist` | Optional, http error codes allowed to pass through | 404, 500, etc. No error code is allowed by default | | `gobblin.writer.http.maxAttempts` | Optional, max number of attempts including initial send | Default is 3 | | `gobblin.writer.http.contentType` | Optional, content type of the request body | ```"application/json"```, which is the default value | @@ -92,6 +93,7 @@ rest request. The 3 major components are: | `gobblin.writer.http.urlTemplate` | Required, the url template(schema and port included), together with `keys` and `queryParams`, to be resolved to request url. If the schema is `d2`, d2 is enabled | ```http://www.test.com/profiles/${memberId}``` | | `gobblin.writer.http.verb` | Required, [rest(rest.li) verbs](https://github.com/linkedin/rest.li/wiki/Rest.li-User-Guide#resource-methods) | get, update, put, delete, etc | | `gobblin.writer.http.maxAttempts` | Optional, max number of attempts including initial send | Default is 3 | + | `gobblin.writer.http.errorCodeWhitelist` | Optional, http error codes allowed to pass through | 404, 500, etc. No error code is allowed by default | | `gobblin.writer.http.d2.zkHosts`| Required for d2, the zookeeper address | | | `gobblin.writer.http.(d2.)ssl`| Optional, enable ssl | Default is false | | `gobblin.writer.http.(d2.)keyStoreFilePath`| Required for ssl | /tmp/identity.p12 | http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab4fc217/gobblin-modules/gobblin-http/src/test/java/org/apache/gobblin/writer/AsyncHttpWriterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-http/src/test/java/org/apache/gobblin/writer/AsyncHttpWriterTest.java b/gobblin-modules/gobblin-http/src/test/java/org/apache/gobblin/writer/AsyncHttpWriterTest.java index b8e7826..011c7f7 100644 --- a/gobblin-modules/gobblin-http/src/test/java/org/apache/gobblin/writer/AsyncHttpWriterTest.java +++ b/gobblin-modules/gobblin-http/src/test/java/org/apache/gobblin/writer/AsyncHttpWriterTest.java @@ -94,6 +94,7 @@ public class AsyncHttpWriterTest { } Assert.assertTrue(client.isCloseCalled); + Assert.assertTrue(responseHandler.recordsInLastRequest.size() == 1); } @Test @@ -292,9 +293,15 @@ public class AsyncHttpWriterTest { class MockResponseHandler implements ResponseHandler<HttpUriRequest, CloseableHttpResponse> { volatile StatusType type = StatusType.OK; int attempts = 0; + List<Object> recordsInLastRequest; @Override public ResponseStatus handleResponse(Request<HttpUriRequest> request, CloseableHttpResponse response) { + if (request instanceof AsyncRequest) { + AsyncRequest asyncRequest = (AsyncRequest) request; + recordsInLastRequest = new ArrayList<>(); + asyncRequest.getThunks().forEach( thunk -> recordsInLastRequest.add(thunk)); + } attempts++; switch (type) { case OK:
