Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 1912eb790 -> ab4fc217d


[GOBBLIN-488] Make aware of records

Closes #2357 from zxcware/async


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ab4fc217
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ab4fc217
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ab4fc217

Branch: refs/heads/master
Commit: ab4fc217d208d453d3db2eb6f22b4356a8ee94d9
Parents: 1912eb7
Author: zhchen <[email protected]>
Authored: Mon May 7 14:34:04 2018 -0700
Committer: Abhishek Tiwari <[email protected]>
Committed: Mon May 7 14:34:04 2018 -0700

----------------------------------------------------------------------
 .../org/apache/gobblin/async/AsyncRequest.java  | 61 +++++++++++++++-----
 gobblin-docs/sinks/Http.md                      |  2 +
 .../gobblin/writer/AsyncHttpWriterTest.java     |  7 +++
 3 files changed, 56 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab4fc217/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java 
b/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java
index c0aad92..dbdf66d 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.async;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
+
 import lombok.Getter;
 import lombok.Setter;
 
@@ -34,15 +36,34 @@ import org.apache.gobblin.net.Request;
  * @param <RQ> type of raw request
  */
 public class AsyncRequest<D, RQ> implements Request<RQ> {
-  @Getter
-  private int recordCount = 0;
-  @Getter
-  protected long bytesWritten = 0;
   @Getter @Setter
   private RQ rawRequest;
+  private final List<Thunk<D>> thunks = new ArrayList<>();
+
+  /**
+   * Get the total number of records processed in the request
+   */
+  public int getRecordCount() {
+    return thunks.size();
+  }
+
+  /**
+   * Get the total bytes processed in the request
+   */
+  public long getBytesWritten() {
+    long bytesWritten = 0;
+    for (Thunk thunk : thunks) {
+      bytesWritten += thunk.sizeInBytes;
+    }
+    return bytesWritten;
+  }
 
-  @Getter
-  private final List<Thunk> thunks = new ArrayList<>();
+  /**
+   * Get all records information in the request
+   */
+  public List<Thunk<D>> getThunks() {
+    return ImmutableList.copyOf(thunks);
+  }
 
   /**
    * Mark the record associated with this request
@@ -51,24 +72,36 @@ public class AsyncRequest<D, RQ> implements Request<RQ> {
    * @param bytesWritten bytes of the record written into the request
    */
   public void markRecord(BufferedRecord<D> record, int bytesWritten) {
-    if (record.getCallback() != null) {
-      thunks.add(new Thunk(record.getCallback(), bytesWritten));
-    }
-    recordCount++;
-    this.bytesWritten += bytesWritten;
+    thunks.add(new Thunk<>(record, bytesWritten));
   }
 
   /**
-   * A helper class which wraps the callback
-   * It may contain more information related to each individual record
+   * A descriptor that represents a record in the request
    */
-  public static final class Thunk {
+  public static final class Thunk<D> {
+    /**
+     * @deprecated Use {@link #record}
+     */
+    @Deprecated
     public final Callback callback;
+
     public final int sizeInBytes;
+    public final BufferedRecord<D> record;
 
+    /**
+     * @deprecated Use {@link #Thunk(BufferedRecord, int)}
+     */
+    @Deprecated
     Thunk(Callback callback, int sizeInBytes) {
       this.callback = callback;
       this.sizeInBytes = sizeInBytes;
+      this.record = null;
+    }
+
+    Thunk(BufferedRecord<D> record, int sizeInBytes) {
+      this.callback = record.getCallback();
+      this.sizeInBytes = sizeInBytes;
+      this.record = record;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab4fc217/gobblin-docs/sinks/Http.md
----------------------------------------------------------------------
diff --git a/gobblin-docs/sinks/Http.md b/gobblin-docs/sinks/Http.md
index e5c3dbc..8253a63 100644
--- a/gobblin-docs/sinks/Http.md
+++ b/gobblin-docs/sinks/Http.md
@@ -72,6 +72,7 @@ Configurations for the builder are:
 |---|---|---|
 | `gobblin.writer.http.urlTemplate` | Required, the url template(schema and 
port included), together with `keys` and `queryParams`, to be resolved to 
request url | ```http://www.test.com/profiles/${memberId}``` |
 | `gobblin.writer.http.verb` | Required, [http 
verbs](http://www.restapitutorial.com/lessons/httpmethods.html) | get, update, 
delete, etc |
+| `gobblin.writer.http.errorCodeWhitelist` | Optional, http error codes 
allowed to pass through | 404, 500, etc. No error code is allowed by default |
 | `gobblin.writer.http.maxAttempts` | Optional, max number of attempts 
including initial send | Default is 3 |
 | `gobblin.writer.http.contentType` | Optional, content type of the request 
body | ```"application/json"```, which is the default value |
 
@@ -92,6 +93,7 @@ rest request. The 3 major components are:
  | `gobblin.writer.http.urlTemplate` | Required, the url template(schema and 
port included), together with `keys` and `queryParams`, to be resolved to 
request url. If the schema is `d2`, d2 is enabled | 
```http://www.test.com/profiles/${memberId}``` |
  | `gobblin.writer.http.verb` | Required, [rest(rest.li) 
verbs](https://github.com/linkedin/rest.li/wiki/Rest.li-User-Guide#resource-methods)
 | get, update, put, delete, etc |
  | `gobblin.writer.http.maxAttempts` | Optional, max number of attempts 
including initial send | Default is 3 |
+ | `gobblin.writer.http.errorCodeWhitelist` | Optional, http error codes 
allowed to pass through | 404, 500, etc. No error code is allowed by default |
  | `gobblin.writer.http.d2.zkHosts`| Required for d2, the zookeeper address | |
  | `gobblin.writer.http.(d2.)ssl`| Optional, enable ssl | Default is false |
  | `gobblin.writer.http.(d2.)keyStoreFilePath`| Required for ssl | 
/tmp/identity.p12 |

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab4fc217/gobblin-modules/gobblin-http/src/test/java/org/apache/gobblin/writer/AsyncHttpWriterTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-http/src/test/java/org/apache/gobblin/writer/AsyncHttpWriterTest.java
 
b/gobblin-modules/gobblin-http/src/test/java/org/apache/gobblin/writer/AsyncHttpWriterTest.java
index b8e7826..011c7f7 100644
--- 
a/gobblin-modules/gobblin-http/src/test/java/org/apache/gobblin/writer/AsyncHttpWriterTest.java
+++ 
b/gobblin-modules/gobblin-http/src/test/java/org/apache/gobblin/writer/AsyncHttpWriterTest.java
@@ -94,6 +94,7 @@ public class AsyncHttpWriterTest {
     }
 
     Assert.assertTrue(client.isCloseCalled);
+    Assert.assertTrue(responseHandler.recordsInLastRequest.size() == 1);
   }
 
   @Test
@@ -292,9 +293,15 @@ public class AsyncHttpWriterTest {
   class MockResponseHandler implements ResponseHandler<HttpUriRequest, 
CloseableHttpResponse> {
     volatile StatusType type = StatusType.OK;
     int attempts = 0;
+    List<Object> recordsInLastRequest;
 
     @Override
     public ResponseStatus handleResponse(Request<HttpUriRequest> request, 
CloseableHttpResponse response) {
+      if (request instanceof AsyncRequest) {
+        AsyncRequest asyncRequest = (AsyncRequest) request;
+        recordsInLastRequest = new ArrayList<>();
+        asyncRequest.getThunks().forEach( thunk -> 
recordsInLastRequest.add(thunk));
+      }
       attempts++;
       switch (type) {
         case OK:

Reply via email to