Repository: beam
Updated Branches:
  refs/heads/master 0bc375634 -> 8d71ebf82


[BEAM-2632] Use Junit Paramaterized test suits in TextIOReadTest


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/09b6b8fc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/09b6b8fc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/09b6b8fc

Branch: refs/heads/master
Commit: 09b6b8fc18053b2ccb3163c6bdf58dd6705d6eba
Parents: c3bcd4b
Author: huafengw <[email protected]>
Authored: Wed Aug 30 14:18:21 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Wed Sep 13 10:45:11 2017 +0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/TextIOReadTest.java  | 1330 ++++++++----------
 1 file changed, 599 insertions(+), 731 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/09b6b8fc/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index 3a8757e..f7bb12c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -50,12 +50,8 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -64,8 +60,9 @@ import java.util.Set;
 import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
+
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -85,48 +82,29 @@ import org.apache.beam.sdk.values.PCollection;
 import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import 
org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
 import org.joda.time.Duration;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
 
 /** Tests for {@link TextIO.Read}. */
-@RunWith(JUnit4.class)
+@RunWith(Enclosed.class)
 public class TextIOReadTest {
+  private static final int LINES_NUMBER_FOR_LARGE = 1000;
   private static final List<String> EMPTY = Collections.emptyList();
   private static final List<String> TINY =
       Arrays.asList("Irritable eagle", "Optimistic jay", "Fanciful hawk");
-  private static final List<String> LARGE = makeLines(1000);
-  private static int uniquifier = 0;
-
-  private static Path tempFolder;
-  private static File emptyTxt;
-  private static File tinyTxt;
-  private static File largeTxt;
-  private static File emptyGz;
-  private static File tinyGz;
-  private static File largeGz;
-  private static File emptyBzip2;
-  private static File tinyBzip2;
-  private static File largeBzip2;
-  private static File emptyZip;
-  private static File tinyZip;
-  private static File largeZip;
-  private static File emptyDeflate;
-  private static File tinyDeflate;
-  private static File largeDeflate;
-
-  @Rule public TestPipeline p = TestPipeline.create();
-
-  @Rule public ExpectedException expectedException = ExpectedException.none();
-
-  private static File writeToFile(List<String> lines, String filename, 
Compression compression)
+
+  private static final List<String> LARGE = makeLines(LINES_NUMBER_FOR_LARGE);
+
+  private static File writeToFile(
+      List<String> lines, TemporaryFolder folder, String fileName, Compression 
compression)
       throws IOException {
-    File file = tempFolder.resolve(filename).toFile();
+    File file = folder.getRoot().toPath().resolve(fileName).toFile();
     OutputStream output = new FileOutputStream(file);
     switch (compression) {
       case UNCOMPRESSED:
@@ -152,192 +130,6 @@ public class TextIOReadTest {
     return file;
   }
 
-  @BeforeClass
-  public static void setupClass() throws IOException {
-    tempFolder = Files.createTempDirectory("TextIOTest");
-    // empty files
-    emptyTxt = writeToFile(EMPTY, "empty.txt", UNCOMPRESSED);
-    emptyGz = writeToFile(EMPTY, "empty.gz", GZIP);
-    emptyBzip2 = writeToFile(EMPTY, "empty.bz2", BZIP2);
-    emptyZip = writeToFile(EMPTY, "empty.zip", ZIP);
-    emptyDeflate = writeToFile(EMPTY, "empty.deflate", DEFLATE);
-    // tiny files
-    tinyTxt = writeToFile(TINY, "tiny.txt", UNCOMPRESSED);
-    tinyGz = writeToFile(TINY, "tiny.gz", GZIP);
-    tinyBzip2 = writeToFile(TINY, "tiny.bz2", BZIP2);
-    tinyZip = writeToFile(TINY, "tiny.zip", ZIP);
-    tinyDeflate = writeToFile(TINY, "tiny.deflate", DEFLATE);
-    // large files
-    largeTxt = writeToFile(LARGE, "large.txt", UNCOMPRESSED);
-    largeGz = writeToFile(LARGE, "large.gz", GZIP);
-    largeBzip2 = writeToFile(LARGE, "large.bz2", BZIP2);
-    largeZip = writeToFile(LARGE, "large.zip", ZIP);
-    largeDeflate = writeToFile(LARGE, "large.deflate", DEFLATE);
-  }
-
-  @AfterClass
-  public static void teardownClass() throws IOException {
-    Files.walkFileTree(
-        tempFolder,
-        new SimpleFileVisitor<Path>() {
-          @Override
-          public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs)
-              throws IOException {
-            Files.delete(file);
-            return FileVisitResult.CONTINUE;
-          }
-
-          @Override
-          public FileVisitResult postVisitDirectory(Path dir, IOException exc) 
throws IOException {
-            Files.delete(dir);
-            return FileVisitResult.CONTINUE;
-          }
-        });
-  }
-
-  private void runTestRead(String[] expected) throws Exception {
-    File tmpFile = Files.createTempFile(tempFolder, "file", "txt").toFile();
-    String filename = tmpFile.getPath();
-
-    try (PrintStream writer = new PrintStream(new FileOutputStream(tmpFile))) {
-      for (String elem : expected) {
-        byte[] encodedElem = 
CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem);
-        String line = new String(encodedElem);
-        writer.println(line);
-      }
-    }
-
-    TextIO.Read read = TextIO.read().from(filename);
-
-    PCollection<String> output = p.apply(read);
-
-    PAssert.that(output).containsInAnyOrder(expected);
-    p.run();
-  }
-
-  @Test
-  public void testDelimiterSelfOverlaps(){
-    assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c'}));
-    assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'c', 'a', 'b', 'd', 
'a', 'b'}));
-    assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c', 'a', 
'b', 'd'}));
-    assertTrue(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'a'}));
-    assertTrue(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c', 'a', 
'b'}));
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testReadStringsWithCustomDelimiter() throws Exception {
-    final String[] inputStrings = new String[] {
-        // incomplete delimiter
-        "To be, or not to be: that |is the question: ",
-        // incomplete delimiter
-        "To be, or not to be: that *is the question: ",
-        // complete delimiter
-        "Whether 'tis nobler in the mind to suffer |*",
-        // truncated delimiter
-        "The slings and arrows of outrageous fortune,|" };
-
-    File tmpFile = Files.createTempFile(tempFolder, "file", "txt").toFile();
-    String filename = tmpFile.getPath();
-
-    try (FileWriter writer = new FileWriter(tmpFile)) {
-      writer.write(Joiner.on("").join(inputStrings));
-    }
-
-    PAssert.that(p.apply(TextIO.read().from(filename).withDelimiter(new byte[] 
{'|', '*'})))
-        .containsInAnyOrder(
-            "To be, or not to be: that |is the question: To be, or not to be: "
-                + "that *is the question: Whether 'tis nobler in the mind to 
suffer ",
-            "The slings and arrows of outrageous fortune,|");
-    p.run();
-  }
-
-  @Test
-  public void testSplittingSourceWithCustomDelimiter() throws Exception {
-    List<String> testCases = Lists.newArrayList();
-    String infix = "first|*second|*|*third";
-    String[] affixes = new String[] {"", "|", "*", "|*"};
-    for (String prefix : affixes) {
-      for (String suffix : affixes) {
-        testCases.add(prefix + infix + suffix);
-      }
-    }
-    for (String testCase : testCases) {
-      SourceTestUtils.assertSplitAtFractionExhaustive(
-          prepareSource(testCase.getBytes(StandardCharsets.UTF_8), new byte[] 
{'|', '*'}),
-          PipelineOptionsFactory.create());
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testReadStrings() throws Exception {
-    runTestRead(LINES_ARRAY);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testReadEmptyStrings() throws Exception {
-    runTestRead(NO_LINES_ARRAY);
-  }
-
-  @Test
-  public void testReadNamed() throws Exception {
-    p.enableAbandonedNodeEnforcement(false);
-
-    assertEquals("TextIO.Read/Read.out", 
p.apply(TextIO.read().from("somefile")).getName());
-    assertEquals(
-        "MyRead/Read.out", p.apply("MyRead", 
TextIO.read().from(emptyTxt.getPath())).getName());
-  }
-
-  @Test
-  public void testReadDisplayData() {
-    TextIO.Read read = TextIO.read().from("foo.*").withCompression(BZIP2);
-
-    DisplayData displayData = DisplayData.from(read);
-
-    assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
-    assertThat(displayData, hasDisplayItem("compressionType", 
BZIP2.toString()));
-  }
-
-  @Test
-  @Category(ValidatesRunner.class)
-  public void testPrimitiveReadDisplayData() {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-
-    TextIO.Read read = TextIO.read().from("foobar");
-
-    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveSourceTransforms(read);
-    assertThat(
-        "TextIO.Read should include the file prefix in its primitive display 
data",
-        displayData,
-        hasItem(hasDisplayItem(hasValue(startsWith("foobar")))));
-  }
-
-  /** Options for testing. */
-  public interface RuntimeTestOptions extends PipelineOptions {
-    ValueProvider<String> getInput();
-    void setInput(ValueProvider<String> value);
-  }
-
-  @Test
-  public void testRuntimeOptionsNotCalledInApply() throws Exception {
-    p.enableAbandonedNodeEnforcement(false);
-
-    RuntimeTestOptions options =
-        PipelineOptionsFactory.as(RuntimeTestOptions.class);
-
-    p.apply(TextIO.read().from(options.getInput()));
-  }
-
-  @Test
-  public void testCompressionIsSet() throws Exception {
-    TextIO.Read read = TextIO.read().from("/tmp/test");
-    assertEquals(AUTO, read.getCompression());
-    read = TextIO.read().from("/tmp/test").withCompression(GZIP);
-    assertEquals(GZIP, read.getCompression());
-  }
-
   /**
    * Helper that writes the given lines (adding a newline in between) to a 
stream, then closes the
    * stream.
@@ -350,100 +142,63 @@ public class TextIOReadTest {
     }
   }
 
+  /** Helper to make an array of compressible strings. Returns ["word"i] for i 
in range(0,n). */
+  private static List<String> makeLines(int n) {
+    List<String> ret = new ArrayList<>();
+    for (int i = 0; i < n; ++i) {
+      ret.add("word" + i);
+    }
+    return ret;
+  }
+
   /**
-   * Helper method that runs a variety of ways to read a single file using 
TextIO
-   * and checks that they all match the given expected output.
+   * Helper method that runs a variety of ways to read a single file using 
TextIO and checks that
+   * they all match the given expected output.
    *
    * <p>The transforms being verified are:
    * <ul>
    *   <li>TextIO.read().from(filename).withCompression(compressionType)
    *   <li>TextIO.read().from(filename).withCompression(compressionType)
-   *     .withHintMatchesManyFiles()
+   *       .withHintMatchesManyFiles()
    *   <li>TextIO.readAll().withCompression(compressionType)
-   * </ul> and
+   * </ul>
    */
-  private void assertReadingCompressedFileMatchesExpected(
-      File file, Compression compression, List<String> expected) {
-
-    int thisUniquifier = ++uniquifier;
+  private static void assertReadingCompressedFileMatchesExpected(
+      File file, Compression compression, List<String> expected, Pipeline p) {
 
     TextIO.Read read = 
TextIO.read().from(file.getPath()).withCompression(compression);
 
-    PAssert.that(
-            p.apply("Read_" + file + "_" + compression.toString() + "_" + 
thisUniquifier, read))
+    PAssert.that(p.apply("Read_" + file + "_" + compression.toString(), read))
         .containsInAnyOrder(expected);
 
     PAssert.that(
             p.apply(
-                "Read_" + file + "_" + compression.toString() + "_many" + "_" 
+ thisUniquifier,
+                "Read_" + file + "_" + compression.toString() + "_many",
                 read.withHintMatchesManyFiles()))
         .containsInAnyOrder(expected);
 
     TextIO.ReadAll readAll =
         TextIO.readAll().withCompression(compression);
     PAssert.that(
-            p.apply("Create_" + file + "_" + thisUniquifier, 
Create.of(file.getPath()))
-                .apply("Read_" + compression.toString() + "_" + 
thisUniquifier, readAll))
+            p.apply("Create_" + file, Create.of(file.getPath()))
+                .apply("Read_" + compression.toString(), readAll))
         .containsInAnyOrder(expected);
   }
 
-  /** Helper to make an array of compressible strings. Returns ["word"i] for i 
in range(0,n). */
-  private static List<String> makeLines(int n) {
-    List<String> ret = new ArrayList<>();
-    for (int i = 0; i < n; ++i) {
-      ret.add("word" + i);
-    }
-    return ret;
-  }
-
-  /** Tests reading from a small, gzipped file with no .gz extension but GZIP 
compression set. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testSmallCompressedGzipReadNoExtension() throws Exception {
-    File smallGzNoExtension = writeToFile(TINY, "tiny_gz_no_extension", GZIP);
-    assertReadingCompressedFileMatchesExpected(smallGzNoExtension, GZIP, TINY);
-    p.run();
-  }
-
-  /**
-   * Tests reading from a small, uncompressed file with .gz extension. This 
must work in AUTO or
-   * GZIP modes. This is needed because some network file systems / HTTP 
clients will transparently
-   * decompress gzipped content.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testSmallCompressedGzipReadActuallyUncompressed() throws 
Exception {
-    File smallGzNotCompressed =
-        writeToFile(TINY, "tiny_uncompressed.gz", UNCOMPRESSED);
-    // Should work with GZIP compression set.
-    assertReadingCompressedFileMatchesExpected(smallGzNotCompressed, GZIP, 
TINY);
-    // Should also work with AUTO mode set.
-    assertReadingCompressedFileMatchesExpected(smallGzNotCompressed, AUTO, 
TINY);
-    p.run();
-  }
-
-  /** Tests reading from a small, bzip2ed file with no .bz2 extension but 
BZIP2 compression set. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testSmallCompressedBzip2ReadNoExtension() throws Exception {
-    File smallBz2NoExtension = writeToFile(TINY, "tiny_bz2_no_extension", 
BZIP2);
-    assertReadingCompressedFileMatchesExpected(smallBz2NoExtension, BZIP2, 
TINY);
-    p.run();
-  }
-
   /**
    * Create a zip file with the given lines.
    *
    * @param expected A list of expected lines, populated in the zip file.
+   * @param folder A temporary folder used to create files.
    * @param filename Optionally zip file name (can be null).
    * @param fieldsEntries Fields to write in zip entries.
    * @return The zip filename.
    * @throws Exception In case of a failure during zip file creation.
    */
-  private String createZipFile(List<String> expected, String filename, 
String[]... fieldsEntries)
+  private static File createZipFile(
+      List<String> expected, TemporaryFolder folder, String filename, 
String[]... fieldsEntries)
       throws Exception {
-    File tmpFile = tempFolder.resolve(filename).toFile();
-    String tmpFileName = tmpFile.getPath();
+    File tmpFile = folder.getRoot().toPath().resolve(filename).toFile();
 
     ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile));
     PrintStream writer = new PrintStream(out, true /* auto-flush on write */);
@@ -462,547 +217,660 @@ public class TextIOReadTest {
     writer.close();
     out.close();
 
-    return tmpFileName;
+    return tmpFile;
+  }
+
+  private static TextSource prepareSource(
+      TemporaryFolder temporaryFolder, byte[] data, byte[] delimiter) throws 
IOException {
+    Path path = temporaryFolder.newFile().toPath();
+    Files.write(path, data);
+    return new TextSource(
+        ValueProvider.StaticValueProvider.of(path.toString()),
+        EmptyMatchTreatment.DISALLOW,
+        delimiter);
   }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testTxtRead() throws Exception {
-    // Files with non-compressed extensions should work in AUTO and 
UNCOMPRESSED modes.
-    for (Compression type : new Compression[] {AUTO, UNCOMPRESSED}) {
-      assertReadingCompressedFileMatchesExpected(emptyTxt, type, EMPTY);
-      assertReadingCompressedFileMatchesExpected(tinyTxt, type, TINY);
-      assertReadingCompressedFileMatchesExpected(largeTxt, type, LARGE);
+  private static String getFileSuffix(Compression compression) {
+    switch (compression) {
+      case UNCOMPRESSED:
+        return ".txt";
+      case GZIP:
+        return ".gz";
+      case BZIP2:
+        return ".bz2";
+      case ZIP:
+        return ".zip";
+      case DEFLATE:
+        return ".deflate";
+      default:
+        return "";
     }
-    p.run();
   }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testGzipCompressedRead() throws Exception {
-    // Files with the right extensions should work in AUTO and GZIP modes.
-    for (Compression type : new Compression[] {AUTO, GZIP}) {
-      assertReadingCompressedFileMatchesExpected(emptyGz, type, EMPTY);
-      assertReadingCompressedFileMatchesExpected(tinyGz, type, TINY);
-      assertReadingCompressedFileMatchesExpected(largeGz, type, LARGE);
+  /** Tests for reading from different size of files with various Compression. 
*/
+  @RunWith(Parameterized.class)
+  public static class CompressedReadTest {
+    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+    @Rule public TestPipeline p = TestPipeline.create();
+
+    @Parameterized.Parameters(name = "{index}: {1}")
+    public static Iterable<Object[]> data() {
+      return ImmutableList.<Object[]>builder()
+          .add(new Object[] {EMPTY, UNCOMPRESSED})
+          .add(new Object[] {EMPTY, GZIP})
+          .add(new Object[] {EMPTY, BZIP2})
+          .add(new Object[] {EMPTY, ZIP})
+          .add(new Object[] {EMPTY, DEFLATE})
+          .add(new Object[] {TINY, UNCOMPRESSED})
+          .add(new Object[] {TINY, GZIP})
+          .add(new Object[] {TINY, BZIP2})
+          .add(new Object[] {TINY, ZIP})
+          .add(new Object[] {TINY, DEFLATE})
+          .add(new Object[] {LARGE, UNCOMPRESSED})
+          .add(new Object[] {LARGE, GZIP})
+          .add(new Object[] {LARGE, BZIP2})
+          .add(new Object[] {LARGE, ZIP})
+          .add(new Object[] {LARGE, DEFLATE})
+          .build();
     }
 
-    // Sanity check that we're properly testing compression.
-    assertThat(largeTxt.length(), greaterThan(largeGz.length()));
+    @Parameterized.Parameter(0)
+    public List<String> lines;
 
-    // GZIP files with non-gz extension should work in GZIP mode.
-    File gzFile = writeToFile(TINY, "tiny_gz_no_extension", GZIP);
-    assertReadingCompressedFileMatchesExpected(gzFile, GZIP, TINY);
-    p.run();
-  }
+    @Parameterized.Parameter(1)
+    public Compression compression;
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testBzip2CompressedRead() throws Exception {
-    // Files with the right extensions should work in AUTO and BZIP2 modes.
-    for (Compression type : new Compression[] {AUTO, BZIP2}) {
-      assertReadingCompressedFileMatchesExpected(emptyBzip2, type, EMPTY);
-      assertReadingCompressedFileMatchesExpected(tinyBzip2, type, TINY);
-      assertReadingCompressedFileMatchesExpected(largeBzip2, type, LARGE);
+    /** Tests reading from a small, compressed file with no extension. */
+    @Test
+    @Category(NeedsRunner.class)
+    public void testCompressedReadWithoutExtension() throws Exception {
+      String fileName = lines.size() + "_" + compression + "_no_extension";
+      File fileWithNoExtension = writeToFile(lines, tempFolder, fileName, 
compression);
+      assertReadingCompressedFileMatchesExpected(fileWithNoExtension, 
compression, lines, p);
+      p.run();
     }
 
-    // Sanity check that we're properly testing compression.
-    assertThat(largeTxt.length(), greaterThan(largeBzip2.length()));
+    @Test
+    @Category(NeedsRunner.class)
+    public void testCompressedReadWithExtension() throws Exception {
+      String fileName =
+          lines.size() + "_" + compression + "_no_extension" + 
getFileSuffix(compression);
+      File fileWithExtension = writeToFile(lines, tempFolder, fileName, 
compression);
+
+      // Sanity check that we're properly testing compression.
+      if (lines.size() == LINES_NUMBER_FOR_LARGE && 
!compression.equals(UNCOMPRESSED)) {
+        File uncompressedFile = writeToFile(lines, tempFolder, "large.txt", 
UNCOMPRESSED);
+        assertThat(uncompressedFile.length(), 
greaterThan(fileWithExtension.length()));
+      }
+
+      assertReadingCompressedFileMatchesExpected(fileWithExtension, 
compression, lines, p);
+      p.run();
+    }
 
-    // BZ2 files with non-bz2 extension should work in BZIP2 mode.
-    File bz2File = writeToFile(TINY, "tiny_bz2_no_extension", BZIP2);
-    assertReadingCompressedFileMatchesExpected(bz2File, BZIP2, TINY);
-    p.run();
+    @Test
+    @Category(NeedsRunner.class)
+    public void testReadWithAuto() throws Exception {
+      // Files with non-compressed extensions should work in AUTO and 
UNCOMPRESSED modes.
+      String fileName =
+          lines.size() + "_" + compression + "_no_extension" + 
getFileSuffix(compression);
+      File fileWithExtension = writeToFile(lines, tempFolder, fileName, 
compression);
+      assertReadingCompressedFileMatchesExpected(fileWithExtension, AUTO, 
lines, p);
+      p.run();
+    }
   }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testZipCompressedRead() throws Exception {
-    // Files with the right extensions should work in AUTO and ZIP modes.
-    for (Compression type : new Compression[] {AUTO, ZIP}) {
-      assertReadingCompressedFileMatchesExpected(emptyZip, type, EMPTY);
-      assertReadingCompressedFileMatchesExpected(tinyZip, type, TINY);
-      assertReadingCompressedFileMatchesExpected(largeZip, type, LARGE);
+  /** Tests for reading files with various delimiters. */
+  @RunWith(Parameterized.class)
+  public static class ReadWithDelimiterTest {
+    private static final ImmutableList<String> EXPECTED = 
ImmutableList.of("asdf", "hjkl", "xyz");
+    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+    @Parameterized.Parameters(name = "{index}: {0}")
+    public static Iterable<Object[]> data() {
+      return ImmutableList.<Object[]>builder()
+          .add(new Object[] {"\n\n\n", ImmutableList.of("", "", "")})
+          .add(new Object[] {"asdf\nhjkl\nxyz\n", EXPECTED})
+          .add(new Object[] {"asdf\rhjkl\rxyz\r", EXPECTED})
+          .add(new Object[] {"asdf\r\nhjkl\r\nxyz\r\n", EXPECTED})
+          .add(new Object[] {"asdf\rhjkl\r\nxyz\n", EXPECTED})
+          .add(new Object[] {"asdf\nhjkl\nxyz", EXPECTED})
+          .add(new Object[] {"asdf\rhjkl\rxyz", EXPECTED})
+          .add(new Object[] {"asdf\r\nhjkl\r\nxyz", EXPECTED})
+          .add(new Object[] {"asdf\rhjkl\r\nxyz", EXPECTED})
+          .build();
     }
 
-    // Sanity check that we're properly testing compression.
-    assertThat(largeTxt.length(), greaterThan(largeZip.length()));
+    @Parameterized.Parameter(0)
+    public String line;
 
-    // Zip files with non-zip extension should work in ZIP mode.
-    File zipFile = writeToFile(TINY, "tiny_zip_no_extension", ZIP);
-    assertReadingCompressedFileMatchesExpected(zipFile, ZIP, TINY);
-    p.run();
-  }
+    @Parameterized.Parameter(1)
+    public ImmutableList<String> expected;
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testDeflateCompressedRead() throws Exception {
-    // Files with the right extensions should work in AUTO and ZIP modes.
-    for (Compression type : new Compression[] {AUTO, DEFLATE}) {
-      assertReadingCompressedFileMatchesExpected(emptyDeflate, type, EMPTY);
-      assertReadingCompressedFileMatchesExpected(tinyDeflate, type, TINY);
-      assertReadingCompressedFileMatchesExpected(largeDeflate, type, LARGE);
+    @Test
+    public void testReadLinesWithDelimiter() throws Exception {
+      runTestReadWithData(line.getBytes(UTF_8), expected);
     }
 
-    // Sanity check that we're properly testing compression.
-    assertThat(largeTxt.length(), greaterThan(largeDeflate.length()));
-
-    // Deflate files with non-deflate extension should work in DEFLATE mode.
-    File deflateFile = writeToFile(TINY, "tiny_deflate_no_extension", DEFLATE);
-    assertReadingCompressedFileMatchesExpected(deflateFile, DEFLATE, TINY);
-    p.run();
-  }
+    @Test
+    public void testSplittingSource() throws Exception {
+      TextSource source = prepareSource(line.getBytes(UTF_8));
+      SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
+    }
 
-  /**
-   * Tests a zip file with no entries. This is a corner case not tested 
elsewhere as the default
-   * test zip files have a single entry.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testZipCompressedReadWithNoEntries() throws Exception {
-    String filename = createZipFile(new ArrayList<String>(), "empty zip file");
-    assertReadingCompressedFileMatchesExpected(new File(filename), ZIP, EMPTY);
-    p.run();
-  }
+    private TextSource prepareSource(byte[] data) throws IOException {
+      return TextIOReadTest.prepareSource(tempFolder, data, null);
+    }
 
-  /**
-   * Tests a zip file with multiple entries. This is a corner case not tested 
elsewhere as the
-   * default test zip files have a single entry.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testZipCompressedReadWithMultiEntriesFile() throws Exception {
-    String[] entry0 = new String[] {"first", "second", "three"};
-    String[] entry1 = new String[] {"four", "five", "six"};
-    String[] entry2 = new String[] {"seven", "eight", "nine"};
-
-    List<String> expected = new ArrayList<>();
-
-    String filename = createZipFile(expected, "multiple entries", entry0, 
entry1, entry2);
-    assertReadingCompressedFileMatchesExpected(new File(filename), ZIP, 
expected);
-    p.run();
+    private void runTestReadWithData(byte[] data, List<String> 
expectedResults) throws Exception {
+      TextSource source = prepareSource(data);
+      List<String> actual = SourceTestUtils.readFromSource(source, 
PipelineOptionsFactory.create());
+      assertThat(
+          actual, containsInAnyOrder(new 
ArrayList<>(expectedResults).toArray(new String[0])));
+    }
   }
 
-  /**
-   * Read a ZIP compressed file containing data, multiple empty entries, and 
then more data. We
-   * expect just the data back.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testZipCompressedReadWithComplexEmptyAndPresentEntries() throws 
Exception {
-    String filename =
-        createZipFile(
-            new ArrayList<String>(),
-            "complex empty and present entries",
-            new String[] {"cat"},
-            new String[] {},
-            new String[] {},
-            new String[] {"dog"});
-
-    assertReadingCompressedFileMatchesExpected(
-        new File(filename), ZIP, Arrays.asList("cat", "dog"));
-    p.run();
-  }
+  /** Tests for some basic operations in {@link TextIO.Read}. */
+  @RunWith(JUnit4.class)
+  public static class BasicIOTest {
+    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+    @Rule public TestPipeline p = TestPipeline.create();
 
-  @Test
-  public void testTextIOGetName() {
-    assertEquals("TextIO.Read", TextIO.read().from("somefile").getName());
-    assertEquals("TextIO.Read", TextIO.read().from("somefile").toString());
-  }
+    private void runTestRead(String[] expected) throws Exception {
+      File tmpFile = tempFolder.newFile();
+      String filename = tmpFile.getPath();
 
-  @Test
-  public void testProgressEmptyFile() throws IOException {
-    try (BoundedReader<String> reader =
-        prepareSource(new byte[0], 
null).createReader(PipelineOptionsFactory.create())) {
-      // Check preconditions before starting.
-      assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
-      assertEquals(0, reader.getSplitPointsConsumed());
-      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, 
reader.getSplitPointsRemaining());
+      try (PrintStream writer = new PrintStream(new 
FileOutputStream(tmpFile))) {
+        for (String elem : expected) {
+          byte[] encodedElem = 
CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem);
+          String line = new String(encodedElem);
+          writer.println(line);
+        }
+      }
 
-      // Assert empty
-      assertFalse(reader.start());
+      TextIO.Read read = TextIO.read().from(filename);
+      PCollection<String> output = p.apply(read);
 
-      // Check postconditions after finishing
-      assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
-      assertEquals(0, reader.getSplitPointsConsumed());
-      assertEquals(0, reader.getSplitPointsRemaining());
+      PAssert.that(output).containsInAnyOrder(expected);
+      p.run();
     }
-  }
 
-  @Test
-  public void testProgressTextFile() throws IOException {
-    String file = "line1\nline2\nline3";
-    try (BoundedReader<String> reader =
-        prepareSource(file.getBytes(), 
null).createReader(PipelineOptionsFactory.create())) {
-      // Check preconditions before starting
-      assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
-      assertEquals(0, reader.getSplitPointsConsumed());
-      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, 
reader.getSplitPointsRemaining());
-
-      // Line 1
-      assertTrue(reader.start());
-      assertEquals(0, reader.getSplitPointsConsumed());
-      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, 
reader.getSplitPointsRemaining());
-
-      // Line 2
-      assertTrue(reader.advance());
-      assertEquals(1, reader.getSplitPointsConsumed());
-      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, 
reader.getSplitPointsRemaining());
-
-      // Line 3
-      assertTrue(reader.advance());
-      assertEquals(2, reader.getSplitPointsConsumed());
-      assertEquals(1, reader.getSplitPointsRemaining());
-
-      // Check postconditions after finishing
-      assertFalse(reader.advance());
-      assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
-      assertEquals(3, reader.getSplitPointsConsumed());
-      assertEquals(0, reader.getSplitPointsRemaining());
+    @Test
+    public void testDelimiterSelfOverlaps(){
+      assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c'}));
+      assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'c', 'a', 'b', 'd', 
'a', 'b'}));
+      assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c', 'a', 
'b', 'd'}));
+      assertTrue(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'a'}));
+      assertTrue(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c', 'a', 
'b'}));
     }
-  }
 
-  @Test
-  public void testProgressAfterSplitting() throws IOException {
-    String file = "line1\nline2\nline3";
-    BoundedSource<String> source = prepareSource(file.getBytes());
-    BoundedSource<String> remainder;
-
-    // Create the remainder, verifying properties pre- and post-splitting.
-    try (BoundedReader<String> readerOrig = 
source.createReader(PipelineOptionsFactory.create())) {
-      // Preconditions.
-      assertEquals(0.0, readerOrig.getFractionConsumed(), 1e-6);
-      assertEquals(0, readerOrig.getSplitPointsConsumed());
-      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, 
readerOrig.getSplitPointsRemaining());
-
-      // First record, before splitting.
-      assertTrue(readerOrig.start());
-      assertEquals(0, readerOrig.getSplitPointsConsumed());
-      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, 
readerOrig.getSplitPointsRemaining());
-
-      // Split. 0.1 is in line1, so should now be able to detect last record.
-      remainder = readerOrig.splitAtFraction(0.1);
-      System.err.println(readerOrig.getCurrentSource());
-      assertNotNull(remainder);
-
-      // First record, after splitting.
-      assertEquals(0, readerOrig.getSplitPointsConsumed());
-      assertEquals(1, readerOrig.getSplitPointsRemaining());
-
-      // Finish and postconditions.
-      assertFalse(readerOrig.advance());
-      assertEquals(1.0, readerOrig.getFractionConsumed(), 1e-6);
-      assertEquals(1, readerOrig.getSplitPointsConsumed());
-      assertEquals(0, readerOrig.getSplitPointsRemaining());
-    }
-
-    // Check the properties of the remainder.
-    try (BoundedReader<String> reader = 
remainder.createReader(PipelineOptionsFactory.create())) {
-      // Preconditions.
-      assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
-      assertEquals(0, reader.getSplitPointsConsumed());
-      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, 
reader.getSplitPointsRemaining());
-
-      // First record should be line 2.
-      assertTrue(reader.start());
-      assertEquals(0, reader.getSplitPointsConsumed());
-      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, 
reader.getSplitPointsRemaining());
-
-      // Second record is line 3
-      assertTrue(reader.advance());
-      assertEquals(1, reader.getSplitPointsConsumed());
-      assertEquals(1, reader.getSplitPointsRemaining());
-
-      // Check postconditions after finishing
-      assertFalse(reader.advance());
-      assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
-      assertEquals(2, reader.getSplitPointsConsumed());
-      assertEquals(0, reader.getSplitPointsRemaining());
+    @Test
+    @Category(NeedsRunner.class)
+    public void testReadStringsWithCustomDelimiter() throws Exception {
+      final String[] inputStrings =
+          new String[] {
+            // incomplete delimiter
+            "To be, or not to be: that |is the question: ",
+            // incomplete delimiter
+            "To be, or not to be: that *is the question: ",
+            // complete delimiter
+            "Whether 'tis nobler in the mind to suffer |*",
+            // truncated delimiter
+            "The slings and arrows of outrageous fortune,|"
+          };
+
+      File tmpFile = tempFolder.newFile("tmpfile.txt");
+      String filename = tmpFile.getPath();
+
+      try (FileWriter writer = new FileWriter(tmpFile)) {
+        writer.write(Joiner.on("").join(inputStrings));
+      }
+
+      PAssert.that(p.apply(TextIO.read().from(filename).withDelimiter(new 
byte[] {'|', '*'})))
+          .containsInAnyOrder(
+              "To be, or not to be: that |is the question: To be, or not to 
be: "
+                  + "that *is the question: Whether 'tis nobler in the mind to 
suffer ",
+              "The slings and arrows of outrageous fortune,|");
+      p.run();
     }
-  }
 
-  @Test
-  public void testReadEmptyLines() throws Exception {
-    runTestReadWithData("\n\n\n".getBytes(StandardCharsets.UTF_8), 
ImmutableList.of("", "", ""));
-  }
+    @Test
+    public void testSplittingSourceWithCustomDelimiter() throws Exception {
+      List<String> testCases = Lists.newArrayList();
+      String infix = "first|*second|*|*third";
+      String[] affixes = new String[] {"", "|", "*", "|*"};
+      for (String prefix : affixes) {
+        for (String suffix : affixes) {
+          testCases.add(prefix + infix + suffix);
+        }
+      }
+      for (String testCase : testCases) {
+        SourceTestUtils.assertSplitAtFractionExhaustive(
+            TextIOReadTest.prepareSource(
+                tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}),
+            PipelineOptionsFactory.create());
+      }
+    }
 
-  @Test
-  public void testReadFileWithLineFeedDelimiter() throws Exception {
-    runTestReadWithData(
-        "asdf\nhjkl\nxyz\n".getBytes(StandardCharsets.UTF_8),
-        ImmutableList.of("asdf", "hjkl", "xyz"));
-  }
+    @Test
+    @Category(NeedsRunner.class)
+    public void testReadStrings() throws Exception {
+      runTestRead(LINES_ARRAY);
+    }
 
-  @Test
-  public void testReadFileWithCarriageReturnDelimiter() throws Exception {
-    runTestReadWithData(
-        "asdf\rhjkl\rxyz\r".getBytes(StandardCharsets.UTF_8),
-        ImmutableList.of("asdf", "hjkl", "xyz"));
-  }
+    @Test
+    @Category(NeedsRunner.class)
+    public void testReadEmptyStrings() throws Exception {
+      runTestRead(NO_LINES_ARRAY);
+    }
 
-  @Test
-  public void testReadFileWithCarriageReturnAndLineFeedDelimiter() throws 
Exception {
-    runTestReadWithData(
-        "asdf\r\nhjkl\r\nxyz\r\n".getBytes(StandardCharsets.UTF_8),
-        ImmutableList.of("asdf", "hjkl", "xyz"));
-  }
+    @Test
+    public void testReadNamed() throws Exception {
+      File emptyFile = tempFolder.newFile();
+      p.enableAbandonedNodeEnforcement(false);
 
-  @Test
-  public void testReadFileWithMixedDelimiters() throws Exception {
-    runTestReadWithData(
-        "asdf\rhjkl\r\nxyz\n".getBytes(StandardCharsets.UTF_8),
-        ImmutableList.of("asdf", "hjkl", "xyz"));
-  }
+      assertEquals("TextIO.Read/Read.out", 
p.apply(TextIO.read().from("somefile")).getName());
+      assertEquals(
+        "MyRead/Read.out", p.apply("MyRead", 
TextIO.read().from(emptyFile.getPath())).getName());
+    }
 
-  @Test
-  public void testReadFileWithLineFeedDelimiterAndNonEmptyBytesAtEnd() throws 
Exception {
-    runTestReadWithData(
-        "asdf\nhjkl\nxyz".getBytes(StandardCharsets.UTF_8),
-        ImmutableList.of("asdf", "hjkl", "xyz"));
-  }
+    @Test
+    public void testReadDisplayData() {
+      TextIO.Read read = TextIO.read().from("foo.*").withCompression(BZIP2);
 
-  @Test
-  public void testReadFileWithCarriageReturnDelimiterAndNonEmptyBytesAtEnd() 
throws Exception {
-    runTestReadWithData(
-        "asdf\rhjkl\rxyz".getBytes(StandardCharsets.UTF_8),
-        ImmutableList.of("asdf", "hjkl", "xyz"));
-  }
+      DisplayData displayData = DisplayData.from(read);
 
-  @Test
-  public void 
testReadFileWithCarriageReturnAndLineFeedDelimiterAndNonEmptyBytesAtEnd()
-      throws Exception {
-    runTestReadWithData(
-        "asdf\r\nhjkl\r\nxyz".getBytes(StandardCharsets.UTF_8),
-        ImmutableList.of("asdf", "hjkl", "xyz"));
-  }
+      assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
+      assertThat(displayData, hasDisplayItem("compressionType", 
BZIP2.toString()));
+    }
 
-  @Test
-  public void testReadFileWithMixedDelimitersAndNonEmptyBytesAtEnd() throws 
Exception {
-    runTestReadWithData(
-        "asdf\rhjkl\r\nxyz".getBytes(StandardCharsets.UTF_8),
-        ImmutableList.of("asdf", "hjkl", "xyz"));
-  }
+    @Test
+    @Category(ValidatesRunner.class)
+    public void testPrimitiveReadDisplayData() {
+      DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
 
-  private void runTestReadWithData(byte[] data, List<String> expectedResults) 
throws Exception {
-    TextSource source = prepareSource(data);
-    List<String> actual = SourceTestUtils.readFromSource(source, 
PipelineOptionsFactory.create());
-    assertThat(actual, containsInAnyOrder(new 
ArrayList<>(expectedResults).toArray(new String[0])));
-  }
+      TextIO.Read read = TextIO.read().from("foobar");
 
-  @Test
-  public void testSplittingSourceWithEmptyLines() throws Exception {
-    TextSource source = prepareSource("\n\n\n".getBytes(UTF_8));
-    SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
-  }
+      Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveSourceTransforms(read);
+      assertThat(
+        "TextIO.Read should include the file prefix in its primitive display 
data",
+        displayData,
+        hasItem(hasDisplayItem(hasValue(startsWith("foobar")))));
+    }
 
-  @Test
-  public void testSplittingSourceWithLineFeedDelimiter() throws Exception {
-    TextSource source = prepareSource("asdf\nhjkl\nxyz\n".getBytes(UTF_8));
-    SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
-  }
+    /** Options for testing. */
+    public interface RuntimeTestOptions extends PipelineOptions {
+      ValueProvider<String> getInput();
+      void setInput(ValueProvider<String> value);
+    }
 
-  @Test
-  public void testSplittingSourceWithCarriageReturnDelimiter() throws 
Exception {
-    TextSource source = prepareSource("asdf\rhjkl\rxyz\r".getBytes(UTF_8));
-    SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
-  }
+    @Test
+    public void testRuntimeOptionsNotCalledInApply() throws Exception {
+      p.enableAbandonedNodeEnforcement(false);
 
-  @Test
-  public void testSplittingSourceWithCarriageReturnAndLineFeedDelimiter() 
throws Exception {
-    TextSource source = 
prepareSource("asdf\r\nhjkl\r\nxyz\r\n".getBytes(UTF_8));
-    SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
-  }
+      RuntimeTestOptions options =
+        PipelineOptionsFactory.as(RuntimeTestOptions.class);
 
-  @Test
-  public void testSplittingSourceWithMixedDelimiters() throws Exception {
-    TextSource source = prepareSource("asdf\rhjkl\r\nxyz\n".getBytes(UTF_8));
-    SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
-  }
+      p.apply(TextIO.read().from(options.getInput()));
+    }
 
-  @Test
-  public void testSplittingSourceWithLineFeedDelimiterAndNonEmptyBytesAtEnd() 
throws Exception {
-    TextSource source = prepareSource("asdf\nhjkl\nxyz".getBytes(UTF_8));
-    SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
-  }
+    @Test
+    public void testCompressionIsSet() throws Exception {
+      TextIO.Read read = TextIO.read().from("/tmp/test");
+      assertEquals(AUTO, read.getCompression());
+      read = TextIO.read().from("/tmp/test").withCompression(GZIP);
+      assertEquals(GZIP, read.getCompression());
+    }
 
-  @Test
-  public void 
testSplittingSourceWithCarriageReturnDelimiterAndNonEmptyBytesAtEnd()
-      throws Exception {
-    TextSource source = prepareSource("asdf\rhjkl\rxyz".getBytes(UTF_8));
-    SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
-  }
+    /**
+     * Tests reading from a small, uncompressed file with .gz extension. This 
must work in
+     * GZIP modes. This is needed because some network file systems / HTTP 
clients will
+     * transparently decompress gzipped content.
+     */
+    @Test
+    @Category(NeedsRunner.class)
+    public void testSmallCompressedGzipReadActuallyUncompressed() throws 
Exception {
+      File smallGzNotCompressed =
+        writeToFile(TINY, tempFolder, "tiny_uncompressed.gz", UNCOMPRESSED);
+      // Should work with GZIP compression set.
+      assertReadingCompressedFileMatchesExpected(smallGzNotCompressed, GZIP, 
TINY, p);
+      p.run();
+    }
 
-  @Test
-  public void 
testSplittingSourceWithCarriageReturnAndLineFeedDelimiterAndNonEmptyBytesAtEnd()
-      throws Exception {
-    TextSource source = prepareSource("asdf\r\nhjkl\r\nxyz".getBytes(UTF_8));
-    SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
-  }
+    /**
+     * Tests reading from a small, uncompressed file with .gz extension. This 
must work in
+     * AUTO modes. This is needed because some network file systems / HTTP 
clients will
+     * transparently decompress gzipped content.
+     */
+    @Test
+    @Category(NeedsRunner.class)
+    public void testSmallCompressedAutoReadActuallyUncompressed() throws 
Exception {
+      File smallGzNotCompressed =
+        writeToFile(TINY, tempFolder, "tiny_uncompressed.gz", UNCOMPRESSED);
+      // Should also work with AUTO mode set.
+      assertReadingCompressedFileMatchesExpected(smallGzNotCompressed, AUTO, 
TINY, p);
+      p.run();
+    }
 
-  @Test
-  public void testSplittingSourceWithMixedDelimitersAndNonEmptyBytesAtEnd() 
throws Exception {
-    TextSource source = prepareSource("asdf\rhjkl\r\nxyz".getBytes(UTF_8));
-    SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
-  }
+    /**
+     * Tests a zip file with no entries. This is a corner case not tested 
elsewhere as the default
+     * test zip files have a single entry.
+     */
+    @Test
+    @Category(NeedsRunner.class)
+    public void testZipCompressedReadWithNoEntries() throws Exception {
+      File file = createZipFile(new ArrayList<String>(), tempFolder, "empty 
zip file");
+      assertReadingCompressedFileMatchesExpected(file, ZIP, EMPTY, p);
+      p.run();
+    }
 
-  private TextSource prepareSource(byte[] data) throws IOException {
-    return prepareSource(data, null /* default delimiters */);
-  }
+    /**
+     * Tests a zip file with multiple entries. This is a corner case not 
tested elsewhere as the
+     * default test zip files have a single entry.
+     */
+    @Test
+    @Category(NeedsRunner.class)
+    public void testZipCompressedReadWithMultiEntriesFile() throws Exception {
+      String[] entry0 = new String[] {"first", "second", "three"};
+      String[] entry1 = new String[] {"four", "five", "six"};
+      String[] entry2 = new String[] {"seven", "eight", "nine"};
+
+      List<String> expected = new ArrayList<>();
+
+      File file =
+        createZipFile(expected, tempFolder, "multiple entries", entry0, 
entry1, entry2);
+      assertReadingCompressedFileMatchesExpected(file, ZIP, expected, p);
+      p.run();
+    }
 
-  private TextSource prepareSource(byte[] data, byte[] delimiter) throws 
IOException {
-    Path path = Files.createTempFile(tempFolder, "tempfile", "ext");
-    Files.write(path, data);
-    return new 
TextSource(ValueProvider.StaticValueProvider.of(path.toString()),
-        EmptyMatchTreatment.DISALLOW, delimiter);
-  }
+    /**
+     * Read a ZIP compressed file containing data, multiple empty entries, and 
then more data. We
+     * expect just the data back.
+     */
+    @Test
+    @Category(NeedsRunner.class)
+    public void testZipCompressedReadWithComplexEmptyAndPresentEntries() 
throws Exception {
+      File file =
+        createZipFile(
+          new ArrayList<String>(),
+          tempFolder,
+          "complex empty and present entries",
+          new String[] {"cat"},
+          new String[] {},
+          new String[] {},
+          new String[] {"dog"});
+
+      assertReadingCompressedFileMatchesExpected(
+        file, ZIP, Arrays.asList("cat", "dog"), p);
+      p.run();
+    }
 
-  @Test
-  public void testInitialSplitAutoModeTxt() throws Exception {
-    PipelineOptions options = TestPipeline.testingPipelineOptions();
-    long desiredBundleSize = 1000;
+    @Test
+    public void testTextIOGetName() {
+      assertEquals("TextIO.Read", TextIO.read().from("somefile").getName());
+      assertEquals("TextIO.Read", TextIO.read().from("somefile").toString());
+    }
 
-    // Sanity check: file is at least 2 bundles long.
-    assertThat(largeTxt.length(), greaterThan(2 * desiredBundleSize));
+    private TextSource prepareSource(byte[] data) throws IOException {
+      return TextIOReadTest.prepareSource(tempFolder, data, null);
+    }
 
-    FileBasedSource<String> source = 
TextIO.read().from(largeTxt.getPath()).getSource();
-    List<? extends FileBasedSource<String>> splits = 
source.split(desiredBundleSize, options);
+    @Test
+    public void testProgressEmptyFile() throws IOException {
+      try (BoundedSource.BoundedReader<String> reader =
+             prepareSource(new 
byte[0]).createReader(PipelineOptionsFactory.create())) {
+        // Check preconditions before starting.
+        assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+        assertEquals(0, reader.getSplitPointsConsumed());
+        assertEquals(
+          BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, 
reader.getSplitPointsRemaining());
+
+        // Assert empty
+        assertFalse(reader.start());
+
+        // Check postconditions after finishing
+        assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+        assertEquals(0, reader.getSplitPointsConsumed());
+        assertEquals(0, reader.getSplitPointsRemaining());
+      }
+    }
 
-    // At least 2 splits and they are equal to reading the whole file.
-    assertThat(splits, hasSize(greaterThan(1)));
-    SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
-  }
+    @Test
+    public void testProgressTextFile() throws IOException {
+      String file = "line1\nline2\nline3";
+      try (BoundedSource.BoundedReader<String> reader =
+             
prepareSource(file.getBytes()).createReader(PipelineOptionsFactory.create())) {
+        // Check preconditions before starting
+        assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+        assertEquals(0, reader.getSplitPointsConsumed());
+        assertEquals(
+          BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, 
reader.getSplitPointsRemaining());
+
+        // Line 1
+        assertTrue(reader.start());
+        assertEquals(0, reader.getSplitPointsConsumed());
+        assertEquals(
+          BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, 
reader.getSplitPointsRemaining());
+
+        // Line 2
+        assertTrue(reader.advance());
+        assertEquals(1, reader.getSplitPointsConsumed());
+        assertEquals(
+          BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, 
reader.getSplitPointsRemaining());
+
+        // Line 3
+        assertTrue(reader.advance());
+        assertEquals(2, reader.getSplitPointsConsumed());
+        assertEquals(1, reader.getSplitPointsRemaining());
+
+        // Check postconditions after finishing
+        assertFalse(reader.advance());
+        assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+        assertEquals(3, reader.getSplitPointsConsumed());
+        assertEquals(0, reader.getSplitPointsRemaining());
+      }
+    }
 
-  @Test
-  public void testInitialSplitAutoModeGz() throws Exception {
-    long desiredBundleSize = 1000;
-    PipelineOptions options = TestPipeline.testingPipelineOptions();
+    @Test
+    public void testProgressAfterSplitting() throws IOException {
+      String file = "line1\nline2\nline3";
+      BoundedSource<String> source = prepareSource(file.getBytes());
+      BoundedSource<String> remainder;
+
+      // Create the remainder, verifying properties pre- and post-splitting.
+      try (BoundedSource.BoundedReader<String> readerOrig =
+             source.createReader(PipelineOptionsFactory.create())) {
+        // Preconditions.
+        assertEquals(0.0, readerOrig.getFractionConsumed(), 1e-6);
+        assertEquals(0, readerOrig.getSplitPointsConsumed());
+        assertEquals(
+          BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, 
readerOrig.getSplitPointsRemaining());
+
+        // First record, before splitting.
+        assertTrue(readerOrig.start());
+        assertEquals(0, readerOrig.getSplitPointsConsumed());
+        assertEquals(
+          BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, 
readerOrig.getSplitPointsRemaining());
+
+        // Split. 0.1 is in line1, so should now be able to detect last record.
+        remainder = readerOrig.splitAtFraction(0.1);
+        System.err.println(readerOrig.getCurrentSource());
+        assertNotNull(remainder);
+
+        // First record, after splitting.
+        assertEquals(0, readerOrig.getSplitPointsConsumed());
+        assertEquals(1, readerOrig.getSplitPointsRemaining());
+
+        // Finish and postconditions.
+        assertFalse(readerOrig.advance());
+        assertEquals(1.0, readerOrig.getFractionConsumed(), 1e-6);
+        assertEquals(1, readerOrig.getSplitPointsConsumed());
+        assertEquals(0, readerOrig.getSplitPointsRemaining());
+      }
 
-    // Sanity check: file is at least 2 bundles long.
-    assertThat(largeGz.length(), greaterThan(2 * desiredBundleSize));
+      // Check the properties of the remainder.
+      try (BoundedSource.BoundedReader<String> reader =
+             remainder.createReader(PipelineOptionsFactory.create())) {
+        // Preconditions.
+        assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+        assertEquals(0, reader.getSplitPointsConsumed());
+        assertEquals(
+          BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, 
reader.getSplitPointsRemaining());
+
+        // First record should be line 2.
+        assertTrue(reader.start());
+        assertEquals(0, reader.getSplitPointsConsumed());
+        assertEquals(
+          BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, 
reader.getSplitPointsRemaining());
+
+        // Second record is line 3
+        assertTrue(reader.advance());
+        assertEquals(1, reader.getSplitPointsConsumed());
+        assertEquals(1, reader.getSplitPointsRemaining());
+
+        // Check postconditions after finishing
+        assertFalse(reader.advance());
+        assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+        assertEquals(2, reader.getSplitPointsConsumed());
+        assertEquals(0, reader.getSplitPointsRemaining());
+      }
+    }
 
-    FileBasedSource<String> source = 
TextIO.read().from(largeGz.getPath()).getSource();
-    List<? extends FileBasedSource<String>> splits = 
source.split(desiredBundleSize, options);
+    @Test
+    public void testInitialSplitAutoModeTxt() throws Exception {
+      PipelineOptions options = TestPipeline.testingPipelineOptions();
+      long desiredBundleSize = 1000;
+      File largeTxt = writeToFile(LARGE, tempFolder, "large.txt", 
UNCOMPRESSED);
 
-    // Exactly 1 split, even in AUTO mode, since it is a gzip file.
-    assertThat(splits, hasSize(equalTo(1)));
-    SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
-  }
+      // Sanity check: file is at least 2 bundles long.
+      assertThat(largeTxt.length(), greaterThan(2 * desiredBundleSize));
 
-  @Test
-  public void testInitialSplitGzipModeTxt() throws Exception {
-    PipelineOptions options = TestPipeline.testingPipelineOptions();
-    long desiredBundleSize = 1000;
+      FileBasedSource<String> source = 
TextIO.read().from(largeTxt.getPath()).getSource();
+      List<? extends FileBasedSource<String>> splits = 
source.split(desiredBundleSize, options);
 
-    // Sanity check: file is at least 2 bundles long.
-    assertThat(largeTxt.length(), greaterThan(2 * desiredBundleSize));
+      // At least 2 splits and they are equal to reading the whole file.
+      assertThat(splits, hasSize(greaterThan(1)));
+      SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, 
options);
+    }
 
-    FileBasedSource<String> source =
-        
TextIO.read().from(largeTxt.getPath()).withCompression(GZIP).getSource();
-    List<? extends FileBasedSource<String>> splits = 
source.split(desiredBundleSize, options);
+    @Test
+    public void testInitialSplitAutoModeGz() throws Exception {
+      PipelineOptions options = TestPipeline.testingPipelineOptions();
+      long desiredBundleSize = 1000;
+      File largeGz = writeToFile(LARGE, tempFolder, "large.gz", GZIP);
+      // Sanity check: file is at least 2 bundles long.
+      assertThat(largeGz.length(), greaterThan(2 * desiredBundleSize));
 
-    // Exactly 1 split, even though splittable text file, since using GZIP 
mode.
-    assertThat(splits, hasSize(equalTo(1)));
-    SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
-  }
+      FileBasedSource<String> source = 
TextIO.read().from(largeGz.getPath()).getSource();
+      List<? extends FileBasedSource<String>> splits = 
source.split(desiredBundleSize, options);
 
-  @Test
-  public void testInitialSplitGzipModeGz() throws Exception {
-    PipelineOptions options = TestPipeline.testingPipelineOptions();
-    long desiredBundleSize = 1000;
+      // Exactly 1 split, even in AUTO mode, since it is a gzip file.
+      assertThat(splits, hasSize(equalTo(1)));
+      SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, 
options);
+    }
 
-    // Sanity check: file is at least 2 bundles long.
-    assertThat(largeGz.length(), greaterThan(2 * desiredBundleSize));
+    @Test
+    public void testInitialSplitGzipModeTxt() throws Exception {
+      PipelineOptions options = TestPipeline.testingPipelineOptions();
+      long desiredBundleSize = 1000;
+      File largeTxt = writeToFile(LARGE, tempFolder, "large.txt", 
UNCOMPRESSED);
+      // Sanity check: file is at least 2 bundles long.
+      assertThat(largeTxt.length(), greaterThan(2 * desiredBundleSize));
 
-    FileBasedSource<String> source =
-        
TextIO.read().from(largeGz.getPath()).withCompression(GZIP).getSource();
-    List<? extends FileBasedSource<String>> splits = 
source.split(desiredBundleSize, options);
+      FileBasedSource<String> source =
+        
TextIO.read().from(largeTxt.getPath()).withCompression(GZIP).getSource();
+      List<? extends FileBasedSource<String>> splits = 
source.split(desiredBundleSize, options);
 
-    // Exactly 1 split using .gz extension and using GZIP mode.
-    assertThat(splits, hasSize(equalTo(1)));
-    SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
-  }
+      // Exactly 1 split, even though splittable text file, since using GZIP 
mode.
+      assertThat(splits, hasSize(equalTo(1)));
+      SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, 
options);
+    }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testReadAll() throws IOException {
-    writeToFile(TINY, "readAllTiny1.zip", ZIP);
-    writeToFile(TINY, "readAllTiny2.txt", UNCOMPRESSED);
-    writeToFile(LARGE, "readAllLarge1.zip", ZIP);
-    writeToFile(LARGE, "readAllLarge2.txt", UNCOMPRESSED);
-    PCollection<String> lines =
+    @Test
+    @Category(NeedsRunner.class)
+    public void testReadAll() throws IOException {
+      Path tempFolderPath = tempFolder.getRoot().toPath();
+      writeToFile(TINY, tempFolder, "readAllTiny1.zip", ZIP);
+      writeToFile(TINY, tempFolder, "readAllTiny2.txt", UNCOMPRESSED);
+      writeToFile(LARGE, tempFolder, "readAllLarge1.zip", ZIP);
+      writeToFile(LARGE, tempFolder, "readAllLarge2.txt", UNCOMPRESSED);
+      PCollection<String> lines =
         p.apply(
-                Create.of(
-                    tempFolder.resolve("readAllTiny*").toString(),
-                    tempFolder.resolve("readAllLarge*").toString()))
-            .apply(TextIO.readAll().withCompression(AUTO));
-    PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, 
LARGE));
-    p.run();
-  }
+          Create.of(
+            tempFolderPath.resolve("readAllTiny*").toString(),
+            tempFolderPath.resolve("readAllLarge*").toString()))
+          .apply(TextIO.readAll().withCompression(AUTO));
+      PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, 
LARGE, LARGE));
+      p.run();
+    }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testReadFiles() throws IOException {
-    writeToFile(TINY, "readAllTiny1.zip", ZIP);
-    writeToFile(TINY, "readAllTiny2.txt", UNCOMPRESSED);
-    writeToFile(LARGE, "readAllLarge1.zip", ZIP);
-    writeToFile(LARGE, "readAllLarge2.txt", UNCOMPRESSED);
-    PCollection<String> lines =
+    @Test
+    @Category(NeedsRunner.class)
+    public void testReadFiles() throws IOException {
+      Path tempFolderPath = tempFolder.getRoot().toPath();
+      writeToFile(TINY, tempFolder, "readAllTiny1.zip", ZIP);
+      writeToFile(TINY, tempFolder, "readAllTiny2.txt", UNCOMPRESSED);
+      writeToFile(LARGE, tempFolder, "readAllLarge1.zip", ZIP);
+      writeToFile(LARGE, tempFolder, "readAllLarge2.txt", UNCOMPRESSED);
+      PCollection<String> lines =
         p.apply(
-                Create.of(
-                    tempFolder.resolve("readAllTiny*").toString(),
-                    tempFolder.resolve("readAllLarge*").toString()))
-            .apply(FileIO.matchAll())
-            .apply(FileIO.readMatches().withCompression(AUTO))
-            .apply(TextIO.readFiles().withDesiredBundleSizeBytes(10));
-    PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, 
LARGE));
-    p.run();
-  }
+          Create.of(
+            tempFolderPath.resolve("readAllTiny*").toString(),
+            tempFolderPath.resolve("readAllLarge*").toString()))
+          .apply(FileIO.matchAll())
+          .apply(FileIO.readMatches().withCompression(AUTO))
+          .apply(TextIO.readFiles().withDesiredBundleSizeBytes(10));
+      PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, 
LARGE, LARGE));
+      p.run();
+    }
 
-  @Test
-  @Category({NeedsRunner.class, UsesSplittableParDo.class})
-  public void testReadWatchForNewFiles() throws IOException, 
InterruptedException {
-    final Path basePath = tempFolder.resolve("readWatch");
-    basePath.toFile().mkdir();
-    PCollection<String> lines =
-        p.apply(
-            TextIO.read()
-                .from(basePath.resolve("*").toString())
-                // Make sure that compression type propagates into readAll()
-                .withCompression(ZIP)
-                .watchForNewFiles(
-                    Duration.millis(100),
-                    
Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3))));
-
-    Thread writer =
+    @Test
+    @Category({NeedsRunner.class, UsesSplittableParDo.class})
+    public void testReadWatchForNewFiles() throws IOException, 
InterruptedException {
+      final Path basePath = tempFolder.getRoot().toPath().resolve("readWatch");
+      basePath.toFile().mkdir();
+      PCollection<String> lines =
+          p.apply(
+              TextIO.read()
+                  .from(basePath.resolve("*").toString())
+                  // Make sure that compression type propagates into readAll()
+                  .withCompression(ZIP)
+                  .watchForNewFiles(
+                      Duration.millis(100),
+                      
Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3))));
+
+      Thread writer =
         new Thread() {
           @Override
           public void run() {
             try {
               Thread.sleep(1000);
               writeToFile(
-                  Arrays.asList("a.1", "a.2"),
-                  basePath.resolve("fileA").toString(),
-                  ZIP);
+                Arrays.asList("a.1", "a.2"),
+                tempFolder,
+                basePath.resolve("fileA").toString(),
+                ZIP);
               Thread.sleep(300);
               writeToFile(
-                  Arrays.asList("b.1", "b.2"),
-                  basePath.resolve("fileB").toString(),
-                  ZIP);
+                Arrays.asList("b.1", "b.2"),
+                tempFolder,
+                basePath.resolve("fileB").toString(),
+                ZIP);
               Thread.sleep(300);
               writeToFile(
-                  Arrays.asList("c.1", "c.2"),
-                  basePath.resolve("fileC").toString(),
-                  ZIP);
+                Arrays.asList("c.1", "c.2"),
+                tempFolder,
+                basePath.resolve("fileC").toString(),
+                ZIP);
             } catch (IOException | InterruptedException e) {
               throw new RuntimeException(e);
             }
           }
         };
-    writer.start();
+      writer.start();
 
-    PAssert.that(lines).containsInAnyOrder("a.1", "a.2", "b.1", "b.2", "c.1", 
"c.2");
-    p.run();
+      PAssert.that(lines).containsInAnyOrder("a.1", "a.2", "b.1", "b.2", 
"c.1", "c.2");
+      p.run();
 
-    writer.join();
+      writer.join();
+    }
   }
 }

Reply via email to