Repository: beam
Updated Branches:
  refs/heads/release-2.0.0 d6bc8b834 -> 235e1cede


[BEAM-2249] Correctly handle partial reads in AvroSource


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f58ea41d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f58ea41d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f58ea41d

Branch: refs/heads/release-2.0.0
Commit: f58ea41d728bc989e38b4bb039db32342f759c3a
Parents: d6bc8b8
Author: Dan Halperin <[email protected]>
Authored: Wed May 10 10:11:53 2017 -0700
Committer: Dan Halperin <[email protected]>
Committed: Wed May 10 13:00:19 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroSource.java | 21 ++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f58ea41d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 61bc4a4..37bbe46 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -59,6 +59,7 @@ import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import 
org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream;
 import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
 import org.apache.commons.compress.utils.CountingInputStream;
+import org.apache.commons.compress.utils.IOUtils;
 
 // CHECKSTYLE.OFF: JavadocStyle
 /**
@@ -124,7 +125,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   // Default minimum bundle size (chosen as two default-size Avro blocks to 
attempt to
   // ensure that every source has at least one block of records).
   // The default sync interval is 64k.
-  static final long DEFAULT_MIN_BUNDLE_SIZE = 2 * 
DataFileConstants.DEFAULT_SYNC_INTERVAL;
+  private static final long DEFAULT_MIN_BUNDLE_SIZE = 2 * 
DataFileConstants.DEFAULT_SYNC_INTERVAL;
 
   // The JSON schema used to encode records.
   private final String readSchemaString;
@@ -663,23 +664,27 @@ public class AvroSource<T> extends BlockBasedSource<T> {
       long headerSize = countStream.getBytesRead() - preHeaderCount;
 
       // Create the current block by reading blockSize bytes. Block sizes 
permitted by the Avro
-      // specification are [32, 2^30], so this narrowing is ok.
+      // specification are [32, 2^30], so the cast is safe.
       byte[] data = new byte[(int) blockSize];
-      int read = stream.read(data);
-      checkState(blockSize == read, "Only %s/%s bytes in the block were read", 
read, blockSize);
+      int bytesRead = IOUtils.readFully(stream, data);
+      checkState(
+          blockSize == bytesRead,
+          "Only able to read %s/%s bytes in the block before EOF reached.",
+          bytesRead,
+          blockSize);
       currentBlock = new AvroBlock<>(data, numRecords, getCurrentSource());
 
       // Read the end of this block, which MUST be a sync marker for 
correctness.
       byte[] syncMarker = getCurrentSource().getSyncMarker();
       byte[] readSyncMarker = new byte[syncMarker.length];
       long syncMarkerOffset = startOfNextBlock + headerSize + blockSize;
-      long bytesRead = stream.read(readSyncMarker);
+      bytesRead = IOUtils.readFully(stream, readSyncMarker);
       checkState(
           bytesRead == syncMarker.length,
-          "When trying to read a sync marker at position %s, only able to read 
%s/%s bytes",
-          syncMarkerOffset,
+          "Only able to read %s/%s bytes of Avro sync marker at position %s 
before EOF reached.",
           bytesRead,
-          syncMarker.length);
+          syncMarker.length,
+          syncMarkerOffset);
       if (!Arrays.equals(syncMarker, readSyncMarker)) {
         throw new IllegalStateException(
             String.format(

Reply via email to