Repository: incubator-gobblin
Updated Branches:
  refs/heads/master e9ad289c4 -> f3472d30c


[GOBBLIN-314] Validate filesize when copying in writer

Closes #2168 from jack-moseley/filesize_check


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f3472d30
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f3472d30
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f3472d30

Branch: refs/heads/master
Commit: f3472d30c716613bbe5e41df508e82b58add6e12
Parents: e9ad289
Author: Jack Moseley <[email protected]>
Authored: Wed Nov 15 09:55:56 2017 -0800
Committer: Issac Buenrostro <[email protected]>
Committed: Wed Nov 15 09:55:56 2017 -0800

----------------------------------------------------------------------
 .../copy/writer/FileAwareInputStreamDataWriter.java    | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f3472d30/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 03fc2b6..cda144e 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -84,6 +84,8 @@ import org.apache.gobblin.writer.DataWriter;
 public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileAwareInputStream> implements FinalState, 
SpeculativeAttemptAwareConstruct {
 
   public static final String GOBBLIN_COPY_BYTES_COPIED_METER = 
"gobblin.copy.bytesCopiedMeter";
+  public static final String GOBBLIN_COPY_CHECK_FILESIZE = 
"gobblin.copy.checkFileSize";
+  public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false;
 
   protected final AtomicLong bytesWritten = new AtomicLong();
   protected final AtomicLong filesWritten = new AtomicLong();
@@ -96,6 +98,7 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
   protected final RecoveryHelper recoveryHelper;
   protected final SharedResourcesBroker<GobblinScopeTypes> taskBroker;
   protected final int bufferSize;
+  private final boolean checkFileSize;
 
   protected final Meter copySpeedMeter;
 
@@ -143,6 +146,8 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
     this.bufferSize = state.getPropAsInt(CopyConfiguration.BUFFER_SIZE, 
StreamCopier.DEFAULT_BUFFER_SIZE);
     this.encryptionConfig = EncryptionConfigParser
         .getConfigForBranch(EncryptionConfigParser.EntityType.WRITER, 
this.state, numBranches, branchId);
+
+    this.checkFileSize = state.getPropAsBoolean(GOBBLIN_COPY_CHECK_FILESIZE, 
DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE);
   }
 
   public FileAwareInputStreamDataWriter(State state, int numBranches, int 
branchId)
@@ -231,7 +236,13 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
         if (isInstrumentationEnabled()) {
           copier.withCopySpeedMeter(this.copySpeedMeter);
         }
-        this.bytesWritten.addAndGet(copier.copy());
+        long numBytes = copier.copy();
+        long fileSize = copyableFile.getFileStatus().getLen();
+        if (this.checkFileSize && numBytes != fileSize) {
+          throw new IOException(String.format("Number of bytes copied doesn't 
match filesize for file %s.",
+              copyableFile.getOrigin().getPath()));
+        }
+        this.bytesWritten.addAndGet(numBytes);
         if (isInstrumentationEnabled()) {
           log.info("File {}: copied {} bytes, average rate: {} B/s", 
copyableFile.getOrigin().getPath(),
               this.copySpeedMeter.getCount(), 
this.copySpeedMeter.getMeanRate());

Reply via email to