Repository: beam Updated Branches: refs/heads/release-2.0.0 d6bc8b834 -> 235e1cede
[BEAM-2249] Correctly handle partial reads in AvroSource Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f58ea41d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f58ea41d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f58ea41d Branch: refs/heads/release-2.0.0 Commit: f58ea41d728bc989e38b4bb039db32342f759c3a Parents: d6bc8b8 Author: Dan Halperin <[email protected]> Authored: Wed May 10 10:11:53 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Wed May 10 13:00:19 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroSource.java | 21 ++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f58ea41d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java index 61bc4a4..37bbe46 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java @@ -59,6 +59,7 @@ import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream; import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; import org.apache.commons.compress.utils.CountingInputStream; +import org.apache.commons.compress.utils.IOUtils; // CHECKSTYLE.OFF: JavadocStyle /** @@ -124,7 +125,7 @@ public class AvroSource<T> extends BlockBasedSource<T> { // Default minimum bundle size (chosen as two default-size Avro blocks to attempt to // ensure that every source has at least one block of records). // The default sync interval is 64k. - static final long DEFAULT_MIN_BUNDLE_SIZE = 2 * DataFileConstants.DEFAULT_SYNC_INTERVAL; + private static final long DEFAULT_MIN_BUNDLE_SIZE = 2 * DataFileConstants.DEFAULT_SYNC_INTERVAL; // The JSON schema used to encode records. private final String readSchemaString; @@ -663,23 +664,27 @@ public class AvroSource<T> extends BlockBasedSource<T> { long headerSize = countStream.getBytesRead() - preHeaderCount; // Create the current block by reading blockSize bytes. Block sizes permitted by the Avro - // specification are [32, 2^30], so this narrowing is ok. + // specification are [32, 2^30], so the cast is safe. byte[] data = new byte[(int) blockSize]; - int read = stream.read(data); - checkState(blockSize == read, "Only %s/%s bytes in the block were read", read, blockSize); + int bytesRead = IOUtils.readFully(stream, data); + checkState( + blockSize == bytesRead, + "Only able to read %s/%s bytes in the block before EOF reached.", + bytesRead, + blockSize); currentBlock = new AvroBlock<>(data, numRecords, getCurrentSource()); // Read the end of this block, which MUST be a sync marker for correctness. byte[] syncMarker = getCurrentSource().getSyncMarker(); byte[] readSyncMarker = new byte[syncMarker.length]; long syncMarkerOffset = startOfNextBlock + headerSize + blockSize; - long bytesRead = stream.read(readSyncMarker); + bytesRead = IOUtils.readFully(stream, readSyncMarker); checkState( bytesRead == syncMarker.length, - "When trying to read a sync marker at position %s, only able to read %s/%s bytes", - syncMarkerOffset, + "Only able to read %s/%s bytes of Avro sync marker at position %s before EOF reached.", bytesRead, - syncMarker.length); + syncMarker.length, + syncMarkerOffset); if (!Arrays.equals(syncMarker, readSyncMarker)) { throw new IllegalStateException( String.format(
