Repository: incubator-gobblin Updated Branches: refs/heads/master b7279f763 -> 807eb46e1
[GOBBLIN-547] Make calculation of bytesWritten in AsyncRequest more efficient Make DatePartitionedNestedRetriever extended for method partially Closes #2408 from autumnust/fixOnRetriever Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/807eb46e Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/807eb46e Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/807eb46e Branch: refs/heads/master Commit: 807eb46e1c33c37159389ed23c51835e4d5cfaa6 Parents: b7279f7 Author: Lei Sun <[email protected]> Authored: Tue Jul 31 17:23:37 2018 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Tue Jul 31 17:23:37 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/gobblin/async/AsyncRequest.java | 14 +++++++------- .../source/DatePartitionedNestedRetriever.java | 13 +++++++++++-- 2 files changed, 18 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/807eb46e/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java b/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java index 965e791..69b218c 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/async/AsyncRequest.java @@ -37,8 +37,9 @@ import org.apache.gobblin.net.Request; */ public class AsyncRequest<D, RQ> implements Request<RQ> { @Getter @Setter - protected RQ rawRequest; + private RQ rawRequest; protected final List<Thunk<D>> thunks = new ArrayList<>(); + private long byteSize = 0; /** * Get the total number of records processed in the request @@ -51,11 +52,7 @@ public class AsyncRequest<D, RQ> implements Request<RQ> { * Get the total bytes processed in the request */ public long getBytesWritten() { - long bytesWritten = 0; - for (Thunk thunk : thunks) { - bytesWritten += thunk.sizeInBytes; - } - return bytesWritten; + return this.byteSize; } /** @@ -72,7 +69,10 @@ public class AsyncRequest<D, RQ> implements Request<RQ> { * @param bytesWritten bytes of the record written into the request */ public void markRecord(BufferedRecord<D> record, int bytesWritten) { - thunks.add(new Thunk<>(record, bytesWritten)); + synchronized (this) { + thunks.add(new Thunk<>(record, bytesWritten)); + byteSize += bytesWritten; + } } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/807eb46e/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java index 4c33555..0dcad65 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java @@ -68,13 +68,14 @@ public class DatePartitionedNestedRetriever implements PartitionAwareFileRetriev private String sourcePartitionPrefix; private String sourcePartitionSuffix; private Path sourceDir; - private FileSystem fs; private HadoopFsHelper helper; private final String expectedExtension; private Duration leadTimeDuration; private boolean schemaInSourceDir; private String schemaFile; + protected FileSystem fs; + public DatePartitionedNestedRetriever(String expectedExtension) { this.expectedExtension = expectedExtension; } @@ -120,7 +121,7 @@ public class DatePartitionedNestedRetriever implements PartitionAwareFileRetriev Path sourcePath = constructSourcePath(date); if (this.fs.exists(sourcePath)) { - for (FileStatus fileStatus : this.fs.listStatus(sourcePath, getFileFilter())) { + for (FileStatus fileStatus : getFilteredFileStatuses(sourcePath, getFileFilter())) { LOG.info("Will process file " + fileStatus.getPath()); filesToProcess.add(new FileInfo(fileStatus.getPath().toString(), fileStatus.getLen(), date.getMillis())); } @@ -130,6 +131,14 @@ public class DatePartitionedNestedRetriever implements PartitionAwareFileRetriev return filesToProcess; } + /** + * This method could be overwritten to support more complicated file-loading scheme, + * e.g. recursively browsing of the source path. + */ + protected FileStatus[] getFilteredFileStatuses(Path sourcePath, PathFilter pathFilter) throws IOException { + return this.fs.listStatus(sourcePath, pathFilter); + } + @Override public long getWatermarkFromString(String lowWaterMark) { return this.partitionPatternFormatter.parseMillis(lowWaterMark);
