Repository: incubator-gobblin
Updated Branches:
  refs/heads/master b7279f763 -> 807eb46e1


[GOBBLIN-547] Make calculation of bytesWritten in AsyncRequest more efficient

Make DatePartitionedNestedRetriever extended for
method partially

Closes #2408 from autumnust/fixOnRetriever


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/807eb46e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/807eb46e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/807eb46e

Branch: refs/heads/master
Commit: 807eb46e1c33c37159389ed23c51835e4d5cfaa6
Parents: b7279f7
Author: Lei Sun <[email protected]>
Authored: Tue Jul 31 17:23:37 2018 -0700
Committer: Abhishek Tiwari <[email protected]>
Committed: Tue Jul 31 17:23:37 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/gobblin/async/AsyncRequest.java   | 14 +++++++-------
 .../source/DatePartitionedNestedRetriever.java        | 13 +++++++++++--
 2 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/807eb46e/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java 
b/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java
index 965e791..69b218c 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java
@@ -37,8 +37,9 @@ import org.apache.gobblin.net.Request;
  */
 public class AsyncRequest<D, RQ> implements Request<RQ> {
   @Getter @Setter
-  protected RQ rawRequest;
+  private RQ rawRequest;
   protected final List<Thunk<D>> thunks = new ArrayList<>();
+  private long byteSize = 0;
 
   /**
    * Get the total number of records processed in the request
@@ -51,11 +52,7 @@ public class AsyncRequest<D, RQ> implements Request<RQ> {
    * Get the total bytes processed in the request
    */
   public long getBytesWritten() {
-    long bytesWritten = 0;
-    for (Thunk thunk : thunks) {
-      bytesWritten += thunk.sizeInBytes;
-    }
-    return bytesWritten;
+    return this.byteSize;
   }
 
   /**
@@ -72,7 +69,10 @@ public class AsyncRequest<D, RQ> implements Request<RQ> {
    * @param bytesWritten bytes of the record written into the request
    */
   public void markRecord(BufferedRecord<D> record, int bytesWritten) {
-    thunks.add(new Thunk<>(record, bytesWritten));
+    synchronized (this) {
+      thunks.add(new Thunk<>(record, bytesWritten));
+      byteSize += bytesWritten;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/807eb46e/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
index 4c33555..0dcad65 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
@@ -68,13 +68,14 @@ public class DatePartitionedNestedRetriever implements 
PartitionAwareFileRetriev
   private String sourcePartitionPrefix;
   private String sourcePartitionSuffix;
   private Path sourceDir;
-  private FileSystem fs;
   private HadoopFsHelper helper;
   private final String expectedExtension;
   private Duration leadTimeDuration;
   private boolean schemaInSourceDir;
   private String schemaFile;
 
+  protected FileSystem fs;
+
   public DatePartitionedNestedRetriever(String expectedExtension) {
     this.expectedExtension = expectedExtension;
   }
@@ -120,7 +121,7 @@ public class DatePartitionedNestedRetriever implements 
PartitionAwareFileRetriev
       Path sourcePath = constructSourcePath(date);
 
       if (this.fs.exists(sourcePath)) {
-        for (FileStatus fileStatus : this.fs.listStatus(sourcePath, 
getFileFilter())) {
+        for (FileStatus fileStatus : getFilteredFileStatuses(sourcePath, 
getFileFilter())) {
           LOG.info("Will process file " + fileStatus.getPath());
           filesToProcess.add(new FileInfo(fileStatus.getPath().toString(), 
fileStatus.getLen(), date.getMillis()));
         }
@@ -130,6 +131,14 @@ public class DatePartitionedNestedRetriever implements 
PartitionAwareFileRetriev
     return filesToProcess;
   }
 
+  /**
+   * This method could be overwritten to support more complicated file-loading 
scheme,
+   * e.g. recursively browsing of the source path.
+   */
+  protected FileStatus[] getFilteredFileStatuses(Path sourcePath, PathFilter 
pathFilter) throws IOException {
+    return this.fs.listStatus(sourcePath, pathFilter);
+  }
+
   @Override
   public long getWatermarkFromString(String lowWaterMark) {
     return this.partitionPatternFormatter.parseMillis(lowWaterMark);

Reply via email to