Repository: incubator-gobblin Updated Branches: refs/heads/master e9ad289c4 -> f3472d30c
[GOBBLIN-314] Validate filesize when copying in writer Closes #2168 from jack-moseley/filesize_check Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f3472d30 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f3472d30 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f3472d30 Branch: refs/heads/master Commit: f3472d30c716613bbe5e41df508e82b58add6e12 Parents: e9ad289 Author: Jack Moseley <[email protected]> Authored: Wed Nov 15 09:55:56 2017 -0800 Committer: Issac Buenrostro <[email protected]> Committed: Wed Nov 15 09:55:56 2017 -0800 ---------------------------------------------------------------------- .../copy/writer/FileAwareInputStreamDataWriter.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f3472d30/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java index 03fc2b6..cda144e 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java @@ -84,6 +84,8 @@ import org.apache.gobblin.writer.DataWriter; public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileAwareInputStream> implements FinalState, SpeculativeAttemptAwareConstruct { public static final String GOBBLIN_COPY_BYTES_COPIED_METER = "gobblin.copy.bytesCopiedMeter"; + public static final String GOBBLIN_COPY_CHECK_FILESIZE = "gobblin.copy.checkFileSize"; + public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false; protected final AtomicLong bytesWritten = new AtomicLong(); protected final AtomicLong filesWritten = new AtomicLong(); @@ -96,6 +98,7 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA protected final RecoveryHelper recoveryHelper; protected final SharedResourcesBroker<GobblinScopeTypes> taskBroker; protected final int bufferSize; + private final boolean checkFileSize; protected final Meter copySpeedMeter; @@ -143,6 +146,8 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA this.bufferSize = state.getPropAsInt(CopyConfiguration.BUFFER_SIZE, StreamCopier.DEFAULT_BUFFER_SIZE); this.encryptionConfig = EncryptionConfigParser .getConfigForBranch(EncryptionConfigParser.EntityType.WRITER, this.state, numBranches, branchId); + + this.checkFileSize = state.getPropAsBoolean(GOBBLIN_COPY_CHECK_FILESIZE, DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE); } public FileAwareInputStreamDataWriter(State state, int numBranches, int branchId) @@ -231,7 +236,13 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA if (isInstrumentationEnabled()) { copier.withCopySpeedMeter(this.copySpeedMeter); } - this.bytesWritten.addAndGet(copier.copy()); + long numBytes = copier.copy(); + long fileSize = copyableFile.getFileStatus().getLen(); + if (this.checkFileSize && numBytes != fileSize) { + throw new IOException(String.format("Number of bytes copied doesn't match filesize for file %s.", + copyableFile.getOrigin().getPath())); + } + this.bytesWritten.addAndGet(numBytes); if (isInstrumentationEnabled()) { log.info("File {}: copied {} bytes, average rate: {} B/s", copyableFile.getOrigin().getPath(), this.copySpeedMeter.getCount(), this.copySpeedMeter.getMeanRate());
