CompressedSource: CompressedReader is never splittable

The only way it's safe to split a compressed file is if the file is not 
compressed. This can
only happen when the source itself is splittable, and that in turn will result 
in the inner
source's reader being returned. A CompressedReader will only be created in the 
event that
the file is NOT splittable. So remove all the logic handling splittable 
compressed readers,
and instead go with the logic when we know/assume the file is compressed.

* TextIO: test compression with larger files

It is important for correctness that we test with large files
because otherwise the compressed file may be larger than the
uncompressed file, which could mask bugs

* TextIOTest: flesh out more

* TextIOTest: add large uncompressed file


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2ae1a747
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2ae1a747
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2ae1a747

Branch: refs/heads/master
Commit: 2ae1a7478df037cf558a808816216e7002b33b47
Parents: 869ba7d
Author: Dan Halperin <[email protected]>
Authored: Wed Aug 10 17:58:09 2016 -0700
Committer: Dan Halperin <[email protected]>
Committed: Wed Aug 10 22:59:51 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/CompressedSource.java    |  80 ++----
 .../beam/sdk/io/CompressedSourceTest.java       |   2 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 251 +++++++++++++++----
 3 files changed, 227 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ae1a747/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index ee4b84b..11ff90f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -430,7 +430,6 @@ public class CompressedSource<T> extends FileBasedSource<T> 
{
 
     private final FileBasedReader<T> readerDelegate;
     private final CompressedSource<T> source;
-    private final boolean splittable;
     private final Object progressLock = new Object();
     @GuardedBy("progressLock")
     private int numRecordsRead;
@@ -443,13 +442,6 @@ public class CompressedSource<T> extends 
FileBasedSource<T> {
     public CompressedReader(CompressedSource<T> source, FileBasedReader<T> 
readerDelegate) {
       super(source);
       this.source = source;
-      boolean splittable;
-      try {
-        splittable = source.isSplittable();
-      } catch (Exception e) {
-        throw new RuntimeException("Unable to tell whether source " + source + 
" is splittable", e);
-      }
-      this.splittable = splittable;
       this.readerDelegate = readerDelegate;
     }
 
@@ -463,27 +455,19 @@ public class CompressedSource<T> extends 
FileBasedSource<T> {
 
     @Override
     public boolean allowsDynamicSplitting() {
-      return splittable;
+      return false;
     }
 
     @Override
     public final long getSplitPointsConsumed() {
-      if (splittable) {
-        return readerDelegate.getSplitPointsConsumed();
-      } else {
-        synchronized (progressLock) {
-          return (isDone() && numRecordsRead > 0) ? 1 : 0;
-        }
+      synchronized (progressLock) {
+        return (isDone() && numRecordsRead > 0) ? 1 : 0;
       }
     }
 
     @Override
     public final long getSplitPointsRemaining() {
-      if (splittable) {
-        return readerDelegate.getSplitPointsRemaining();
-      } else {
-        return isDone() ? 0 : 1;
-      }
+      return isDone() ? 0 : 1;
     }
 
     /**
@@ -491,18 +475,14 @@ public class CompressedSource<T> extends 
FileBasedSource<T> {
      */
     @Override
     protected final boolean isAtSplitPoint() {
-      if (splittable) {
-        return readerDelegate.isAtSplitPoint();
-      } else {
-        // We have to return true for the first record, but not for the state 
before reading it,
-        // and not for the state after reading any other record. Hence == 
rather than >= or <=.
-        // This is required because FileBasedReader is intended for readers 
that can read a range
-        // of offsets in a file and where the range can be split in parts. 
CompressedReader,
-        // however, is a degenerate case because it cannot be split, but it 
has to satisfy the
-        // semantics of offsets and split points anyway.
-        synchronized (progressLock) {
-          return numRecordsRead == 1;
-        }
+      // We have to return true for the first record, but not for the state 
before reading it,
+      // and not for the state after reading any other record. Hence == rather 
than >= or <=.
+      // This is required because FileBasedReader is intended for readers that 
can read a range
+      // of offsets in a file and where the range can be split in parts. 
CompressedReader,
+      // however, is a degenerate case because it cannot be split, but it has 
to satisfy the
+      // semantics of offsets and split points anyway.
+      synchronized (progressLock) {
+        return numRecordsRead == 1;
       }
     }
 
@@ -546,14 +526,9 @@ public class CompressedSource<T> extends 
FileBasedSource<T> {
      */
     @Override
     protected final void startReading(ReadableByteChannel channel) throws 
IOException {
-      if (splittable) {
-        // No-op. We will always delegate to the inner reader, so this.channel 
and this.progressLock
-        // will never be used.
-      } else {
-        synchronized (progressLock) {
-          this.channel = new CountingChannel(channel, 
getCurrentSource().getStartOffset());
-          channel = this.channel;
-        }
+      synchronized (progressLock) {
+        this.channel = new CountingChannel(channel, 
getCurrentSource().getStartOffset());
+        channel = this.channel;
       }
 
       if (source.getChannelFactory() instanceof 
FileNameBasedDecompressingChannelFactory) {
@@ -582,30 +557,21 @@ public class CompressedSource<T> extends 
FileBasedSource<T> {
       return true;
     }
 
-    // Splittable: simply delegates to the inner reader.
-    //
     // Unsplittable: returns the offset in the input stream that has been read 
by the input.
     // these positions are likely to be coarse-grained (in the event of 
buffering) and
     // over-estimates (because they reflect the number of bytes read to 
produce an element, not its
     // start) but both of these provide better data than e.g., reporting the 
start of the file.
     @Override
     protected final long getCurrentOffset() throws NoSuchElementException {
-      if (splittable) {
-        return readerDelegate.getCurrentOffset();
-      } else {
-        synchronized (progressLock) {
-          if (numRecordsRead <= 1) {
-            // Since the first record is at a split point, it should start at 
the beginning of the
-            // file. This avoids the bad case where the decompressor read the 
entire file, which
-            // would cause the file to be treated as empty when returning 
channel.getCount() as it
-            // is outside the valid range.
-            return 0;
-          }
-          if (channel == null) {
-            throw new NoSuchElementException();
-          }
-          return channel.getCount();
+      synchronized (progressLock) {
+        if (numRecordsRead <= 1) {
+          // Since the first record is at a split point, it should start at 
the beginning of the
+          // file. This avoids the bad case where the decompressor read the 
entire file, which
+          // would cause the file to be treated as empty when returning 
channel.getCount() as it
+          // is outside the valid range.
+          return 0;
         }
+        return channel.getCount();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ae1a747/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index 01e5fe5..4a9f950 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -563,7 +563,7 @@ public class CompressedSourceTest {
         if (channel.read(buff) != 1) {
           return false;
         }
-        current = new Byte(buff.get(0));
+        current = buff.get(0);
         offset += 1;
         return true;
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ae1a747/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 6ec3a71..6fd3093 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -25,6 +25,7 @@ import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertEquals;
@@ -61,6 +62,7 @@ import org.apache.beam.sdk.values.PCollection;
 
 import com.google.common.collect.ImmutableList;
 
+import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Rule;
@@ -79,6 +81,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.PrintStream;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SeekableByteChannel;
@@ -102,8 +105,11 @@ import javax.annotation.Nullable;
 @RunWith(JUnit4.class)
 @SuppressWarnings("unchecked")
 public class TextIOTest {
-  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
-  @Rule public ExpectedException expectedException = ExpectedException.none();
+
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
 
   @BeforeClass
   public static void setupClass() {
@@ -166,7 +172,7 @@ public class TextIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void testReadNulls() throws Exception {
-    runTestRead(new Void[]{ null, null, null }, VoidCoder.of());
+    runTestRead(new Void[]{null, null, null}, VoidCoder.of());
   }
 
   @Test
@@ -419,26 +425,27 @@ public class TextIOTest {
     assertEquals(CompressionType.GZIP, read.getCompressionType());
   }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testCompressedRead() throws Exception {
-    String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
-    File tmpFile = tmpFolder.newFile();
-    String filename = tmpFile.getPath();
-
-    List<String> expected = new ArrayList<>();
-    try (PrintStream writer =
-        new PrintStream(new GZIPOutputStream(new FileOutputStream(tmpFile)))) {
+  /**
+   * Helper that writes the given lines (adding a newline in between) to a 
stream, then closes the
+   * stream.
+   */
+  private static void writeToStreamAndClose(String[] lines, OutputStream 
outputStream) {
+    try (PrintStream writer = new PrintStream(outputStream)) {
       for (String line : lines) {
         writer.println(line);
-        expected.add(line);
       }
     }
+  }
 
+  /**
+   * Helper method that runs 
TextIO.Read.from(filename).withCompressionType(compressionType)
+   * and asserts that the results match the given expected output.
+   */
+  private static void assertReadingCompressedFileMatchesExpected(
+      String filename, CompressionType compressionType, String[] expected) {
     Pipeline p = TestPipeline.create();
-
     TextIO.Read.Bound<String> read =
-        TextIO.Read.from(filename).withCompressionType(CompressionType.GZIP);
+        TextIO.Read.from(filename).withCompressionType(compressionType);
     PCollection<String> output = p.apply(read);
 
     PAssert.that(output).containsInAnyOrder(expected);
@@ -446,6 +453,178 @@ public class TextIOTest {
   }
 
   /**
+   * Helper to make an array of compressible strings. Returns ["word"i] for i 
in range(0,n).
+   */
+  private static String[] makeLines(int n) {
+    String[] ret = new String[n];
+    for (int i = 0; i < n; ++i) {
+      ret[i] = "word" + i;
+    }
+    return ret;
+  }
+
+  /**
+   * Tests reading from a small, gzipped file with no .gz extension but GZIP 
compression set.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testSmallCompressedGzipReadNoExtension() throws Exception {
+    String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
+    File tmpFile = tmpFolder.newFile(); // no GZ extension
+    String filename = tmpFile.getPath();
+
+    writeToStreamAndClose(lines, new GZIPOutputStream(new 
FileOutputStream(tmpFile)));
+    assertReadingCompressedFileMatchesExpected(filename, CompressionType.GZIP, 
lines);
+  }
+
+  /**
+   * Tests reading from a small, gzipped file with .gz extension and AUTO or 
GZIP compression set.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testSmallCompressedGzipRead() throws Exception {
+    String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
+    File tmpFile = tmpFolder.newFile("small_gzip.gz");
+    String filename = tmpFile.getPath();
+
+    writeToStreamAndClose(lines, new GZIPOutputStream(new 
FileOutputStream(tmpFile)));
+    // Should work in AUTO mode.
+    assertReadingCompressedFileMatchesExpected(filename, CompressionType.AUTO, 
lines);
+    // Should work in GZIP mode.
+    assertReadingCompressedFileMatchesExpected(filename, CompressionType.GZIP, 
lines);
+  }
+
+  /**
+   * Tests reading from a small, uncompressed file with .gz extension.
+   * This must work in AUTO or GZIP modes. This is needed because some network 
file systems / HTTP
+   * clients will transparently decompress gzipped content.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testSmallCompressedGzipReadActuallyUncompressed() throws 
Exception {
+    String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
+    File tmpFile = tmpFolder.newFile("not_really_gzipped.gz"); // GZ file 
extension lies
+    String filename = tmpFile.getPath();
+
+    writeToStreamAndClose(lines, new FileOutputStream(tmpFile));
+    // Should work with GZIP compression set.
+    assertReadingCompressedFileMatchesExpected(filename, CompressionType.GZIP, 
lines);
+    // Should also work with AUTO mode set.
+    assertReadingCompressedFileMatchesExpected(filename, CompressionType.AUTO, 
lines);
+  }
+
+  /**
+   * Tests reading from a small, bzip2ed file with no .bz2 extension but BZIP2 
compression set.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testSmallCompressedBzip2ReadNoExtension() throws Exception {
+    String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
+    File tmpFile = tmpFolder.newFile(); // no BZ2 extension
+    String filename = tmpFile.getPath();
+
+    writeToStreamAndClose(lines, new BZip2CompressorOutputStream(new 
FileOutputStream(tmpFile)));
+    assertReadingCompressedFileMatchesExpected(filename, 
CompressionType.BZIP2, lines);
+  }
+
+  /**
+   * Tests reading from a small, bzip2ed file with .bz2 extension and AUTO or 
BZIP2 compression set.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testSmallCompressedBzipRead() throws Exception {
+    String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
+    File tmpFile = tmpFolder.newFile("small_bzip2.bz2");
+    String filename = tmpFile.getPath();
+
+    writeToStreamAndClose(lines, new BZip2CompressorOutputStream(new 
FileOutputStream(tmpFile)));
+    // Should work in AUTO mode.
+    assertReadingCompressedFileMatchesExpected(filename, CompressionType.AUTO, 
lines);
+    // Should work in BZIP2 mode.
+    assertReadingCompressedFileMatchesExpected(filename, 
CompressionType.BZIP2, lines);
+  }
+
+  /**
+   * Tests reading from a large, bzip2ed file with .bz2 extension and AUTO or 
BZIP2 compression set.
+   * It is important to test a large compressible file because using only 
small files may mask bugs
+   * from range tracking that can only occur if the file compression ratio is 
high -- small
+   * compressed files are usually as big as the uncompressed ones or bigger.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testLargeCompressedBzipRead() throws Exception {
+    String[] lines = makeLines(5000);
+    File bz2File = tmpFolder.newFile("large_bzip2.bz2");
+    String bz2Filename = bz2File.getPath();
+
+    writeToStreamAndClose(lines, new BZip2CompressorOutputStream(new 
FileOutputStream(bz2File)));
+    // Should work in AUTO mode.
+    assertReadingCompressedFileMatchesExpected(bz2Filename, 
CompressionType.AUTO, lines);
+    // Should work in BZIP2 mode.
+    assertReadingCompressedFileMatchesExpected(bz2Filename, 
CompressionType.BZIP2, lines);
+
+    // Confirm that the compressed file is smaller than the uncompressed file.
+    File txtFile = tmpFolder.newFile("large_bzip2.txt");
+    writeToStreamAndClose(lines, new FileOutputStream(txtFile));
+    assertThat(Files.size(txtFile.toPath()), 
greaterThan(Files.size(bz2File.toPath())));
+  }
+
+  /**
+   * Tests reading from a large, gzipped file with .gz extension and AUTO or 
GZIP compression set.
+   * It is important to test a large compressible file because using only 
small files may mask bugs
+   * from range tracking that can only occur if the file compression ratio is 
high -- small
+   * compressed files are usually as big as the uncompressed ones or bigger.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testLargeCompressedGzipRead() throws Exception {
+    String[] lines = makeLines(5000);
+    File gzFile = tmpFolder.newFile("large_gzip.gz");
+    String gzFilename = gzFile.getPath();
+
+    writeToStreamAndClose(lines, new GZIPOutputStream(new 
FileOutputStream(gzFile)));
+    // Should work in AUTO mode.
+    assertReadingCompressedFileMatchesExpected(gzFilename, 
CompressionType.AUTO, lines);
+    // Should work in BZIP2 mode.
+    assertReadingCompressedFileMatchesExpected(gzFilename, 
CompressionType.GZIP, lines);
+
+    // Confirm that the compressed file is smaller than the uncompressed file.
+    File txtFile = tmpFolder.newFile("large_gzip.txt");
+    writeToStreamAndClose(lines, new FileOutputStream(txtFile));
+    assertThat(Files.size(txtFile.toPath()), 
greaterThan(Files.size(gzFile.toPath())));
+  }
+
+  /**
+   * Tests reading from a large, uncompressed file.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testLargeUncompressedReadTxt() throws Exception {
+    String[] lines = makeLines(5000);
+    File txtFile = tmpFolder.newFile("large_file.txt");
+    String txtFilename = txtFile.getPath();
+
+    writeToStreamAndClose(lines, new FileOutputStream(txtFile));
+    // Should work in AUTO mode.
+    assertReadingCompressedFileMatchesExpected(txtFilename, 
CompressionType.AUTO, lines);
+  }
+
+  /**
+   * Tests reading from a large, uncompressed file with a weird file extension.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testLargeUncompressedReadWeirdExtension() throws Exception {
+    String[] lines = makeLines(5000);
+    File txtFile = tmpFolder.newFile("large_file.bin.data.foo");
+    String txtFilename = txtFile.getPath();
+
+    writeToStreamAndClose(lines, new FileOutputStream(txtFile));
+    // Should work in AUTO mode.
+    assertReadingCompressedFileMatchesExpected(txtFilename, 
CompressionType.AUTO, lines);
+  }
+
+  /**
    * Create a zip file with the given lines.
    *
    * @param expected A list of expected lines, populated in the zip file.
@@ -553,7 +732,7 @@ public class TextIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void testZipCompressedReadWithEmptyEntry() throws Exception {
-    String filename = createZipFile(new ArrayList<String>(), null, new 
String[]{ });
+    String filename = createZipFile(new ArrayList<String>(), null, new 
String[]{});
 
     Pipeline p = TestPipeline.create();
 
@@ -571,9 +750,9 @@ public class TextIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void testZipCompressedReadWithMultiEntriesFile() throws Exception {
-    String[] entry0 = new String[]{ "first", "second", "three" };
-    String[] entry1 = new String[]{ "four", "five", "six" };
-    String[] entry2 = new String[]{ "seven", "eight", "nine" };
+    String[] entry0 = new String[]{"first", "second", "three"};
+    String[] entry1 = new String[]{"four", "five", "six"};
+    String[] entry2 = new String[]{"seven", "eight", "nine"};
 
     List<String> expected = new ArrayList<>();
 
@@ -599,10 +778,10 @@ public class TextIOTest {
     String filename = createZipFile(
         new ArrayList<String>(),
         null,
-        new String[] {"cat"},
-        new String[] {},
-        new String[] {},
-        new String[] {"dog"});
+        new String[]{"cat"},
+        new String[]{},
+        new String[]{},
+        new String[]{"dog"});
     List<String> expected = ImmutableList.of("cat", "dog");
 
     Pipeline p = TestPipeline.create();
@@ -615,30 +794,6 @@ public class TextIOTest {
   }
 
   @Test
-  @Category(NeedsRunner.class)
-  public void testGZIPReadWhenUncompressed() throws Exception {
-    String[] lines = {"Meritorious condor", "Obnoxious duck"};
-    File tmpFile = tmpFolder.newFile();
-    String filename = tmpFile.getPath();
-
-    List<String> expected = new ArrayList<>();
-    try (PrintStream writer = new PrintStream(new FileOutputStream(tmpFile))) {
-      for (String line : lines) {
-        writer.println(line);
-        expected.add(line);
-      }
-    }
-
-    Pipeline p = TestPipeline.create();
-    TextIO.Read.Bound<String> read =
-        TextIO.Read.from(filename).withCompressionType(CompressionType.GZIP);
-    PCollection<String> output = p.apply(read);
-
-    PAssert.that(output).containsInAnyOrder(expected);
-    p.run();
-  }
-
-  @Test
   public void testTextIOGetName() {
     assertEquals("TextIO.Read", TextIO.Read.from("somefile").getName());
     assertEquals("TextIO.Write", TextIO.Write.to("somefile").getName());

Reply via email to