CompressedSource: CompressedReader is never splittable The only way it's safe to split a compressed file is if the file is not compressed. This can only happen when the source itself is splittable, and that in turn will result in the inner source's reader being returned. A CompressedReader will only be created in the event that the file is NOT splittable. So remove all the logic handling splittable compressed readers, and instead go with the logic when we know/assume the file is compressed.
* TextIO: test compression with larger files It is important for correctness that we test with large files because otherwise the compressed file may be larger than the uncompressed file, which could mask bugs * TextIOTest: flesh out more * TextIOTest: add large uncompressed file Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2ae1a747 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2ae1a747 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2ae1a747 Branch: refs/heads/master Commit: 2ae1a7478df037cf558a808816216e7002b33b47 Parents: 869ba7d Author: Dan Halperin <[email protected]> Authored: Wed Aug 10 17:58:09 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Wed Aug 10 22:59:51 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/io/CompressedSource.java | 80 ++---- .../beam/sdk/io/CompressedSourceTest.java | 2 +- .../java/org/apache/beam/sdk/io/TextIOTest.java | 251 +++++++++++++++---- 3 files changed, 227 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ae1a747/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index ee4b84b..11ff90f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -430,7 +430,6 @@ public class CompressedSource<T> extends FileBasedSource<T> { private final FileBasedReader<T> readerDelegate; private final CompressedSource<T> source; - private final boolean splittable; private final Object progressLock = new Object(); @GuardedBy("progressLock") private int numRecordsRead; @@ -443,13 +442,6 @@ public class CompressedSource<T> extends FileBasedSource<T> { public CompressedReader(CompressedSource<T> source, FileBasedReader<T> readerDelegate) { super(source); this.source = source; - boolean splittable; - try { - splittable = source.isSplittable(); - } catch (Exception e) { - throw new RuntimeException("Unable to tell whether source " + source + " is splittable", e); - } - this.splittable = splittable; this.readerDelegate = readerDelegate; } @@ -463,27 +455,19 @@ public class CompressedSource<T> extends FileBasedSource<T> { @Override public boolean allowsDynamicSplitting() { - return splittable; + return false; } @Override public final long getSplitPointsConsumed() { - if (splittable) { - return readerDelegate.getSplitPointsConsumed(); - } else { - synchronized (progressLock) { - return (isDone() && numRecordsRead > 0) ? 1 : 0; - } + synchronized (progressLock) { + return (isDone() && numRecordsRead > 0) ? 1 : 0; } } @Override public final long getSplitPointsRemaining() { - if (splittable) { - return readerDelegate.getSplitPointsRemaining(); - } else { - return isDone() ? 0 : 1; - } + return isDone() ? 0 : 1; } /** @@ -491,18 +475,14 @@ public class CompressedSource<T> extends FileBasedSource<T> { */ @Override protected final boolean isAtSplitPoint() { - if (splittable) { - return readerDelegate.isAtSplitPoint(); - } else { - // We have to return true for the first record, but not for the state before reading it, - // and not for the state after reading any other record. Hence == rather than >= or <=. - // This is required because FileBasedReader is intended for readers that can read a range - // of offsets in a file and where the range can be split in parts. CompressedReader, - // however, is a degenerate case because it cannot be split, but it has to satisfy the - // semantics of offsets and split points anyway. - synchronized (progressLock) { - return numRecordsRead == 1; - } + // We have to return true for the first record, but not for the state before reading it, + // and not for the state after reading any other record. Hence == rather than >= or <=. + // This is required because FileBasedReader is intended for readers that can read a range + // of offsets in a file and where the range can be split in parts. CompressedReader, + // however, is a degenerate case because it cannot be split, but it has to satisfy the + // semantics of offsets and split points anyway. + synchronized (progressLock) { + return numRecordsRead == 1; } } @@ -546,14 +526,9 @@ public class CompressedSource<T> extends FileBasedSource<T> { */ @Override protected final void startReading(ReadableByteChannel channel) throws IOException { - if (splittable) { - // No-op. We will always delegate to the inner reader, so this.channel and this.progressLock - // will never be used. - } else { - synchronized (progressLock) { - this.channel = new CountingChannel(channel, getCurrentSource().getStartOffset()); - channel = this.channel; - } + synchronized (progressLock) { + this.channel = new CountingChannel(channel, getCurrentSource().getStartOffset()); + channel = this.channel; } if (source.getChannelFactory() instanceof FileNameBasedDecompressingChannelFactory) { @@ -582,30 +557,21 @@ public class CompressedSource<T> extends FileBasedSource<T> { return true; } - // Splittable: simply delegates to the inner reader. - // // Unsplittable: returns the offset in the input stream that has been read by the input. // these positions are likely to be coarse-grained (in the event of buffering) and // over-estimates (because they reflect the number of bytes read to produce an element, not its // start) but both of these provide better data than e.g., reporting the start of the file. @Override protected final long getCurrentOffset() throws NoSuchElementException { - if (splittable) { - return readerDelegate.getCurrentOffset(); - } else { - synchronized (progressLock) { - if (numRecordsRead <= 1) { - // Since the first record is at a split point, it should start at the beginning of the - // file. This avoids the bad case where the decompressor read the entire file, which - // would cause the file to be treated as empty when returning channel.getCount() as it - // is outside the valid range. - return 0; - } - if (channel == null) { - throw new NoSuchElementException(); - } - return channel.getCount(); + synchronized (progressLock) { + if (numRecordsRead <= 1) { + // Since the first record is at a split point, it should start at the beginning of the + // file. This avoids the bad case where the decompressor read the entire file, which + // would cause the file to be treated as empty when returning channel.getCount() as it + // is outside the valid range. + return 0; } + return channel.getCount(); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ae1a747/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java index 01e5fe5..4a9f950 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java @@ -563,7 +563,7 @@ public class CompressedSourceTest { if (channel.read(buff) != 1) { return false; } - current = new Byte(buff.get(0)); + current = buff.get(0); offset += 1; return true; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ae1a747/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 6ec3a71..6fd3093 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -25,6 +25,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; @@ -61,6 +62,7 @@ import org.apache.beam.sdk.values.PCollection; import com.google.common.collect.ImmutableList; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Rule; @@ -79,6 +81,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; +import java.io.OutputStream; import java.io.PrintStream; import java.nio.channels.FileChannel; import java.nio.channels.SeekableByteChannel; @@ -102,8 +105,11 @@ import javax.annotation.Nullable; @RunWith(JUnit4.class) @SuppressWarnings("unchecked") public class TextIOTest { - @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); - @Rule public ExpectedException expectedException = ExpectedException.none(); + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule + public ExpectedException expectedException = ExpectedException.none(); @BeforeClass public static void setupClass() { @@ -166,7 +172,7 @@ public class TextIOTest { @Test @Category(NeedsRunner.class) public void testReadNulls() throws Exception { - runTestRead(new Void[]{ null, null, null }, VoidCoder.of()); + runTestRead(new Void[]{null, null, null}, VoidCoder.of()); } @Test @@ -419,26 +425,27 @@ public class TextIOTest { assertEquals(CompressionType.GZIP, read.getCompressionType()); } - @Test - @Category(NeedsRunner.class) - public void testCompressedRead() throws Exception { - String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"}; - File tmpFile = tmpFolder.newFile(); - String filename = tmpFile.getPath(); - - List<String> expected = new ArrayList<>(); - try (PrintStream writer = - new PrintStream(new GZIPOutputStream(new FileOutputStream(tmpFile)))) { + /** + * Helper that writes the given lines (adding a newline in between) to a stream, then closes the + * stream. + */ + private static void writeToStreamAndClose(String[] lines, OutputStream outputStream) { + try (PrintStream writer = new PrintStream(outputStream)) { for (String line : lines) { writer.println(line); - expected.add(line); } } + } + /** + * Helper method that runs TextIO.Read.from(filename).withCompressionType(compressionType) + * and asserts that the results match the given expected output. + */ + private static void assertReadingCompressedFileMatchesExpected( + String filename, CompressionType compressionType, String[] expected) { Pipeline p = TestPipeline.create(); - TextIO.Read.Bound<String> read = - TextIO.Read.from(filename).withCompressionType(CompressionType.GZIP); + TextIO.Read.from(filename).withCompressionType(compressionType); PCollection<String> output = p.apply(read); PAssert.that(output).containsInAnyOrder(expected); @@ -446,6 +453,178 @@ public class TextIOTest { } /** + * Helper to make an array of compressible strings. Returns ["word"i] for i in range(0,n). + */ + private static String[] makeLines(int n) { + String[] ret = new String[n]; + for (int i = 0; i < n; ++i) { + ret[i] = "word" + i; + } + return ret; + } + + /** + * Tests reading from a small, gzipped file with no .gz extension but GZIP compression set. + */ + @Test + @Category(NeedsRunner.class) + public void testSmallCompressedGzipReadNoExtension() throws Exception { + String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"}; + File tmpFile = tmpFolder.newFile(); // no GZ extension + String filename = tmpFile.getPath(); + + writeToStreamAndClose(lines, new GZIPOutputStream(new FileOutputStream(tmpFile))); + assertReadingCompressedFileMatchesExpected(filename, CompressionType.GZIP, lines); + } + + /** + * Tests reading from a small, gzipped file with .gz extension and AUTO or GZIP compression set. + */ + @Test + @Category(NeedsRunner.class) + public void testSmallCompressedGzipRead() throws Exception { + String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"}; + File tmpFile = tmpFolder.newFile("small_gzip.gz"); + String filename = tmpFile.getPath(); + + writeToStreamAndClose(lines, new GZIPOutputStream(new FileOutputStream(tmpFile))); + // Should work in AUTO mode. + assertReadingCompressedFileMatchesExpected(filename, CompressionType.AUTO, lines); + // Should work in GZIP mode. + assertReadingCompressedFileMatchesExpected(filename, CompressionType.GZIP, lines); + } + + /** + * Tests reading from a small, uncompressed file with .gz extension. + * This must work in AUTO or GZIP modes. This is needed because some network file systems / HTTP + * clients will transparently decompress gzipped content. + */ + @Test + @Category(NeedsRunner.class) + public void testSmallCompressedGzipReadActuallyUncompressed() throws Exception { + String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"}; + File tmpFile = tmpFolder.newFile("not_really_gzipped.gz"); // GZ file extension lies + String filename = tmpFile.getPath(); + + writeToStreamAndClose(lines, new FileOutputStream(tmpFile)); + // Should work with GZIP compression set. + assertReadingCompressedFileMatchesExpected(filename, CompressionType.GZIP, lines); + // Should also work with AUTO mode set. + assertReadingCompressedFileMatchesExpected(filename, CompressionType.AUTO, lines); + } + + /** + * Tests reading from a small, bzip2ed file with no .bz2 extension but BZIP2 compression set. + */ + @Test + @Category(NeedsRunner.class) + public void testSmallCompressedBzip2ReadNoExtension() throws Exception { + String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"}; + File tmpFile = tmpFolder.newFile(); // no BZ2 extension + String filename = tmpFile.getPath(); + + writeToStreamAndClose(lines, new BZip2CompressorOutputStream(new FileOutputStream(tmpFile))); + assertReadingCompressedFileMatchesExpected(filename, CompressionType.BZIP2, lines); + } + + /** + * Tests reading from a small, bzip2ed file with .bz2 extension and AUTO or BZIP2 compression set. + */ + @Test + @Category(NeedsRunner.class) + public void testSmallCompressedBzipRead() throws Exception { + String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"}; + File tmpFile = tmpFolder.newFile("small_bzip2.bz2"); + String filename = tmpFile.getPath(); + + writeToStreamAndClose(lines, new BZip2CompressorOutputStream(new FileOutputStream(tmpFile))); + // Should work in AUTO mode. + assertReadingCompressedFileMatchesExpected(filename, CompressionType.AUTO, lines); + // Should work in BZIP2 mode. + assertReadingCompressedFileMatchesExpected(filename, CompressionType.BZIP2, lines); + } + + /** + * Tests reading from a large, bzip2ed file with .bz2 extension and AUTO or BZIP2 compression set. + * It is important to test a large compressible file because using only small files may mask bugs + * from range tracking that can only occur if the file compression ratio is high -- small + * compressed files are usually as big as the uncompressed ones or bigger. + */ + @Test + @Category(NeedsRunner.class) + public void testLargeCompressedBzipRead() throws Exception { + String[] lines = makeLines(5000); + File bz2File = tmpFolder.newFile("large_bzip2.bz2"); + String bz2Filename = bz2File.getPath(); + + writeToStreamAndClose(lines, new BZip2CompressorOutputStream(new FileOutputStream(bz2File))); + // Should work in AUTO mode. + assertReadingCompressedFileMatchesExpected(bz2Filename, CompressionType.AUTO, lines); + // Should work in BZIP2 mode. + assertReadingCompressedFileMatchesExpected(bz2Filename, CompressionType.BZIP2, lines); + + // Confirm that the compressed file is smaller than the uncompressed file. + File txtFile = tmpFolder.newFile("large_bzip2.txt"); + writeToStreamAndClose(lines, new FileOutputStream(txtFile)); + assertThat(Files.size(txtFile.toPath()), greaterThan(Files.size(bz2File.toPath()))); + } + + /** + * Tests reading from a large, gzipped file with .gz extension and AUTO or GZIP compression set. + * It is important to test a large compressible file because using only small files may mask bugs + * from range tracking that can only occur if the file compression ratio is high -- small + * compressed files are usually as big as the uncompressed ones or bigger. + */ + @Test + @Category(NeedsRunner.class) + public void testLargeCompressedGzipRead() throws Exception { + String[] lines = makeLines(5000); + File gzFile = tmpFolder.newFile("large_gzip.gz"); + String gzFilename = gzFile.getPath(); + + writeToStreamAndClose(lines, new GZIPOutputStream(new FileOutputStream(gzFile))); + // Should work in AUTO mode. + assertReadingCompressedFileMatchesExpected(gzFilename, CompressionType.AUTO, lines); + // Should work in BZIP2 mode. + assertReadingCompressedFileMatchesExpected(gzFilename, CompressionType.GZIP, lines); + + // Confirm that the compressed file is smaller than the uncompressed file. + File txtFile = tmpFolder.newFile("large_gzip.txt"); + writeToStreamAndClose(lines, new FileOutputStream(txtFile)); + assertThat(Files.size(txtFile.toPath()), greaterThan(Files.size(gzFile.toPath()))); + } + + /** + * Tests reading from a large, uncompressed file. + */ + @Test + @Category(NeedsRunner.class) + public void testLargeUncompressedReadTxt() throws Exception { + String[] lines = makeLines(5000); + File txtFile = tmpFolder.newFile("large_file.txt"); + String txtFilename = txtFile.getPath(); + + writeToStreamAndClose(lines, new FileOutputStream(txtFile)); + // Should work in AUTO mode. + assertReadingCompressedFileMatchesExpected(txtFilename, CompressionType.AUTO, lines); + } + + /** + * Tests reading from a large, uncompressed file with a weird file extension. + */ + @Test + @Category(NeedsRunner.class) + public void testLargeUncompressedReadWeirdExtension() throws Exception { + String[] lines = makeLines(5000); + File txtFile = tmpFolder.newFile("large_file.bin.data.foo"); + String txtFilename = txtFile.getPath(); + + writeToStreamAndClose(lines, new FileOutputStream(txtFile)); + // Should work in AUTO mode. + assertReadingCompressedFileMatchesExpected(txtFilename, CompressionType.AUTO, lines); + } + + /** * Create a zip file with the given lines. * * @param expected A list of expected lines, populated in the zip file. @@ -553,7 +732,7 @@ public class TextIOTest { @Test @Category(NeedsRunner.class) public void testZipCompressedReadWithEmptyEntry() throws Exception { - String filename = createZipFile(new ArrayList<String>(), null, new String[]{ }); + String filename = createZipFile(new ArrayList<String>(), null, new String[]{}); Pipeline p = TestPipeline.create(); @@ -571,9 +750,9 @@ public class TextIOTest { @Test @Category(NeedsRunner.class) public void testZipCompressedReadWithMultiEntriesFile() throws Exception { - String[] entry0 = new String[]{ "first", "second", "three" }; - String[] entry1 = new String[]{ "four", "five", "six" }; - String[] entry2 = new String[]{ "seven", "eight", "nine" }; + String[] entry0 = new String[]{"first", "second", "three"}; + String[] entry1 = new String[]{"four", "five", "six"}; + String[] entry2 = new String[]{"seven", "eight", "nine"}; List<String> expected = new ArrayList<>(); @@ -599,10 +778,10 @@ public class TextIOTest { String filename = createZipFile( new ArrayList<String>(), null, - new String[] {"cat"}, - new String[] {}, - new String[] {}, - new String[] {"dog"}); + new String[]{"cat"}, + new String[]{}, + new String[]{}, + new String[]{"dog"}); List<String> expected = ImmutableList.of("cat", "dog"); Pipeline p = TestPipeline.create(); @@ -615,30 +794,6 @@ public class TextIOTest { } @Test - @Category(NeedsRunner.class) - public void testGZIPReadWhenUncompressed() throws Exception { - String[] lines = {"Meritorious condor", "Obnoxious duck"}; - File tmpFile = tmpFolder.newFile(); - String filename = tmpFile.getPath(); - - List<String> expected = new ArrayList<>(); - try (PrintStream writer = new PrintStream(new FileOutputStream(tmpFile))) { - for (String line : lines) { - writer.println(line); - expected.add(line); - } - } - - Pipeline p = TestPipeline.create(); - TextIO.Read.Bound<String> read = - TextIO.Read.from(filename).withCompressionType(CompressionType.GZIP); - PCollection<String> output = p.apply(read); - - PAssert.that(output).containsInAnyOrder(expected); - p.run(); - } - - @Test public void testTextIOGetName() { assertEquals("TextIO.Read", TextIO.Read.from("somefile").getName()); assertEquals("TextIO.Write", TextIO.Write.to("somefile").getName());
