Repository: beam Updated Branches: refs/heads/master 0bc375634 -> 8d71ebf82
[BEAM-2632] Use Junit Paramaterized test suits in TextIOReadTest Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/09b6b8fc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/09b6b8fc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/09b6b8fc Branch: refs/heads/master Commit: 09b6b8fc18053b2ccb3163c6bdf58dd6705d6eba Parents: c3bcd4b Author: huafengw <[email protected]> Authored: Wed Aug 30 14:18:21 2017 +0800 Committer: manuzhang <[email protected]> Committed: Wed Sep 13 10:45:11 2017 +0800 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/TextIOReadTest.java | 1330 ++++++++---------- 1 file changed, 599 insertions(+), 731 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/09b6b8fc/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index 3a8757e..f7bb12c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -50,12 +50,8 @@ import java.io.FileWriter; import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; -import java.nio.charset.StandardCharsets; -import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.SimpleFileVisitor; -import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -64,8 +60,9 @@ import java.util.Set; import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; + +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -85,48 +82,29 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; import org.joda.time.Duration; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; +import org.junit.experimental.runners.Enclosed; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; /** Tests for {@link TextIO.Read}. */ -@RunWith(JUnit4.class) +@RunWith(Enclosed.class) public class TextIOReadTest { + private static final int LINES_NUMBER_FOR_LARGE = 1000; private static final List<String> EMPTY = Collections.emptyList(); private static final List<String> TINY = Arrays.asList("Irritable eagle", "Optimistic jay", "Fanciful hawk"); - private static final List<String> LARGE = makeLines(1000); - private static int uniquifier = 0; - - private static Path tempFolder; - private static File emptyTxt; - private static File tinyTxt; - private static File largeTxt; - private static File emptyGz; - private static File tinyGz; - private static File largeGz; - private static File emptyBzip2; - private static File tinyBzip2; - private static File largeBzip2; - private static File emptyZip; - private static File tinyZip; - private static File largeZip; - private static File emptyDeflate; - private static File tinyDeflate; - private static File largeDeflate; - - @Rule public TestPipeline p = TestPipeline.create(); - - @Rule public ExpectedException expectedException = ExpectedException.none(); - - private static File writeToFile(List<String> lines, String filename, Compression compression) + + private static final List<String> LARGE = makeLines(LINES_NUMBER_FOR_LARGE); + + private static File writeToFile( + List<String> lines, TemporaryFolder folder, String fileName, Compression compression) throws IOException { - File file = tempFolder.resolve(filename).toFile(); + File file = folder.getRoot().toPath().resolve(fileName).toFile(); OutputStream output = new FileOutputStream(file); switch (compression) { case UNCOMPRESSED: @@ -152,192 +130,6 @@ public class TextIOReadTest { return file; } - @BeforeClass - public static void setupClass() throws IOException { - tempFolder = Files.createTempDirectory("TextIOTest"); - // empty files - emptyTxt = writeToFile(EMPTY, "empty.txt", UNCOMPRESSED); - emptyGz = writeToFile(EMPTY, "empty.gz", GZIP); - emptyBzip2 = writeToFile(EMPTY, "empty.bz2", BZIP2); - emptyZip = writeToFile(EMPTY, "empty.zip", ZIP); - emptyDeflate = writeToFile(EMPTY, "empty.deflate", DEFLATE); - // tiny files - tinyTxt = writeToFile(TINY, "tiny.txt", UNCOMPRESSED); - tinyGz = writeToFile(TINY, "tiny.gz", GZIP); - tinyBzip2 = writeToFile(TINY, "tiny.bz2", BZIP2); - tinyZip = writeToFile(TINY, "tiny.zip", ZIP); - tinyDeflate = writeToFile(TINY, "tiny.deflate", DEFLATE); - // large files - largeTxt = writeToFile(LARGE, "large.txt", UNCOMPRESSED); - largeGz = writeToFile(LARGE, "large.gz", GZIP); - largeBzip2 = writeToFile(LARGE, "large.bz2", BZIP2); - largeZip = writeToFile(LARGE, "large.zip", ZIP); - largeDeflate = writeToFile(LARGE, "large.deflate", DEFLATE); - } - - @AfterClass - public static void teardownClass() throws IOException { - Files.walkFileTree( - tempFolder, - new SimpleFileVisitor<Path>() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) - throws IOException { - Files.delete(file); - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { - Files.delete(dir); - return FileVisitResult.CONTINUE; - } - }); - } - - private void runTestRead(String[] expected) throws Exception { - File tmpFile = Files.createTempFile(tempFolder, "file", "txt").toFile(); - String filename = tmpFile.getPath(); - - try (PrintStream writer = new PrintStream(new FileOutputStream(tmpFile))) { - for (String elem : expected) { - byte[] encodedElem = CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem); - String line = new String(encodedElem); - writer.println(line); - } - } - - TextIO.Read read = TextIO.read().from(filename); - - PCollection<String> output = p.apply(read); - - PAssert.that(output).containsInAnyOrder(expected); - p.run(); - } - - @Test - public void testDelimiterSelfOverlaps(){ - assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c'})); - assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'c', 'a', 'b', 'd', 'a', 'b'})); - assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c', 'a', 'b', 'd'})); - assertTrue(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'a'})); - assertTrue(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c', 'a', 'b'})); - } - - @Test - @Category(NeedsRunner.class) - public void testReadStringsWithCustomDelimiter() throws Exception { - final String[] inputStrings = new String[] { - // incomplete delimiter - "To be, or not to be: that |is the question: ", - // incomplete delimiter - "To be, or not to be: that *is the question: ", - // complete delimiter - "Whether 'tis nobler in the mind to suffer |*", - // truncated delimiter - "The slings and arrows of outrageous fortune,|" }; - - File tmpFile = Files.createTempFile(tempFolder, "file", "txt").toFile(); - String filename = tmpFile.getPath(); - - try (FileWriter writer = new FileWriter(tmpFile)) { - writer.write(Joiner.on("").join(inputStrings)); - } - - PAssert.that(p.apply(TextIO.read().from(filename).withDelimiter(new byte[] {'|', '*'}))) - .containsInAnyOrder( - "To be, or not to be: that |is the question: To be, or not to be: " - + "that *is the question: Whether 'tis nobler in the mind to suffer ", - "The slings and arrows of outrageous fortune,|"); - p.run(); - } - - @Test - public void testSplittingSourceWithCustomDelimiter() throws Exception { - List<String> testCases = Lists.newArrayList(); - String infix = "first|*second|*|*third"; - String[] affixes = new String[] {"", "|", "*", "|*"}; - for (String prefix : affixes) { - for (String suffix : affixes) { - testCases.add(prefix + infix + suffix); - } - } - for (String testCase : testCases) { - SourceTestUtils.assertSplitAtFractionExhaustive( - prepareSource(testCase.getBytes(StandardCharsets.UTF_8), new byte[] {'|', '*'}), - PipelineOptionsFactory.create()); - } - } - - @Test - @Category(NeedsRunner.class) - public void testReadStrings() throws Exception { - runTestRead(LINES_ARRAY); - } - - @Test - @Category(NeedsRunner.class) - public void testReadEmptyStrings() throws Exception { - runTestRead(NO_LINES_ARRAY); - } - - @Test - public void testReadNamed() throws Exception { - p.enableAbandonedNodeEnforcement(false); - - assertEquals("TextIO.Read/Read.out", p.apply(TextIO.read().from("somefile")).getName()); - assertEquals( - "MyRead/Read.out", p.apply("MyRead", TextIO.read().from(emptyTxt.getPath())).getName()); - } - - @Test - public void testReadDisplayData() { - TextIO.Read read = TextIO.read().from("foo.*").withCompression(BZIP2); - - DisplayData displayData = DisplayData.from(read); - - assertThat(displayData, hasDisplayItem("filePattern", "foo.*")); - assertThat(displayData, hasDisplayItem("compressionType", BZIP2.toString())); - } - - @Test - @Category(ValidatesRunner.class) - public void testPrimitiveReadDisplayData() { - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - - TextIO.Read read = TextIO.read().from("foobar"); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); - assertThat( - "TextIO.Read should include the file prefix in its primitive display data", - displayData, - hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); - } - - /** Options for testing. */ - public interface RuntimeTestOptions extends PipelineOptions { - ValueProvider<String> getInput(); - void setInput(ValueProvider<String> value); - } - - @Test - public void testRuntimeOptionsNotCalledInApply() throws Exception { - p.enableAbandonedNodeEnforcement(false); - - RuntimeTestOptions options = - PipelineOptionsFactory.as(RuntimeTestOptions.class); - - p.apply(TextIO.read().from(options.getInput())); - } - - @Test - public void testCompressionIsSet() throws Exception { - TextIO.Read read = TextIO.read().from("/tmp/test"); - assertEquals(AUTO, read.getCompression()); - read = TextIO.read().from("/tmp/test").withCompression(GZIP); - assertEquals(GZIP, read.getCompression()); - } - /** * Helper that writes the given lines (adding a newline in between) to a stream, then closes the * stream. @@ -350,100 +142,63 @@ public class TextIOReadTest { } } + /** Helper to make an array of compressible strings. Returns ["word"i] for i in range(0,n). */ + private static List<String> makeLines(int n) { + List<String> ret = new ArrayList<>(); + for (int i = 0; i < n; ++i) { + ret.add("word" + i); + } + return ret; + } + /** - * Helper method that runs a variety of ways to read a single file using TextIO - * and checks that they all match the given expected output. + * Helper method that runs a variety of ways to read a single file using TextIO and checks that + * they all match the given expected output. * * <p>The transforms being verified are: * <ul> * <li>TextIO.read().from(filename).withCompression(compressionType) * <li>TextIO.read().from(filename).withCompression(compressionType) - * .withHintMatchesManyFiles() + * .withHintMatchesManyFiles() * <li>TextIO.readAll().withCompression(compressionType) - * </ul> and + * </ul> */ - private void assertReadingCompressedFileMatchesExpected( - File file, Compression compression, List<String> expected) { - - int thisUniquifier = ++uniquifier; + private static void assertReadingCompressedFileMatchesExpected( + File file, Compression compression, List<String> expected, Pipeline p) { TextIO.Read read = TextIO.read().from(file.getPath()).withCompression(compression); - PAssert.that( - p.apply("Read_" + file + "_" + compression.toString() + "_" + thisUniquifier, read)) + PAssert.that(p.apply("Read_" + file + "_" + compression.toString(), read)) .containsInAnyOrder(expected); PAssert.that( p.apply( - "Read_" + file + "_" + compression.toString() + "_many" + "_" + thisUniquifier, + "Read_" + file + "_" + compression.toString() + "_many", read.withHintMatchesManyFiles())) .containsInAnyOrder(expected); TextIO.ReadAll readAll = TextIO.readAll().withCompression(compression); PAssert.that( - p.apply("Create_" + file + "_" + thisUniquifier, Create.of(file.getPath())) - .apply("Read_" + compression.toString() + "_" + thisUniquifier, readAll)) + p.apply("Create_" + file, Create.of(file.getPath())) + .apply("Read_" + compression.toString(), readAll)) .containsInAnyOrder(expected); } - /** Helper to make an array of compressible strings. Returns ["word"i] for i in range(0,n). */ - private static List<String> makeLines(int n) { - List<String> ret = new ArrayList<>(); - for (int i = 0; i < n; ++i) { - ret.add("word" + i); - } - return ret; - } - - /** Tests reading from a small, gzipped file with no .gz extension but GZIP compression set. */ - @Test - @Category(NeedsRunner.class) - public void testSmallCompressedGzipReadNoExtension() throws Exception { - File smallGzNoExtension = writeToFile(TINY, "tiny_gz_no_extension", GZIP); - assertReadingCompressedFileMatchesExpected(smallGzNoExtension, GZIP, TINY); - p.run(); - } - - /** - * Tests reading from a small, uncompressed file with .gz extension. This must work in AUTO or - * GZIP modes. This is needed because some network file systems / HTTP clients will transparently - * decompress gzipped content. - */ - @Test - @Category(NeedsRunner.class) - public void testSmallCompressedGzipReadActuallyUncompressed() throws Exception { - File smallGzNotCompressed = - writeToFile(TINY, "tiny_uncompressed.gz", UNCOMPRESSED); - // Should work with GZIP compression set. - assertReadingCompressedFileMatchesExpected(smallGzNotCompressed, GZIP, TINY); - // Should also work with AUTO mode set. - assertReadingCompressedFileMatchesExpected(smallGzNotCompressed, AUTO, TINY); - p.run(); - } - - /** Tests reading from a small, bzip2ed file with no .bz2 extension but BZIP2 compression set. */ - @Test - @Category(NeedsRunner.class) - public void testSmallCompressedBzip2ReadNoExtension() throws Exception { - File smallBz2NoExtension = writeToFile(TINY, "tiny_bz2_no_extension", BZIP2); - assertReadingCompressedFileMatchesExpected(smallBz2NoExtension, BZIP2, TINY); - p.run(); - } - /** * Create a zip file with the given lines. * * @param expected A list of expected lines, populated in the zip file. + * @param folder A temporary folder used to create files. * @param filename Optionally zip file name (can be null). * @param fieldsEntries Fields to write in zip entries. * @return The zip filename. * @throws Exception In case of a failure during zip file creation. */ - private String createZipFile(List<String> expected, String filename, String[]... fieldsEntries) + private static File createZipFile( + List<String> expected, TemporaryFolder folder, String filename, String[]... fieldsEntries) throws Exception { - File tmpFile = tempFolder.resolve(filename).toFile(); - String tmpFileName = tmpFile.getPath(); + File tmpFile = folder.getRoot().toPath().resolve(filename).toFile(); ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile)); PrintStream writer = new PrintStream(out, true /* auto-flush on write */); @@ -462,547 +217,660 @@ public class TextIOReadTest { writer.close(); out.close(); - return tmpFileName; + return tmpFile; + } + + private static TextSource prepareSource( + TemporaryFolder temporaryFolder, byte[] data, byte[] delimiter) throws IOException { + Path path = temporaryFolder.newFile().toPath(); + Files.write(path, data); + return new TextSource( + ValueProvider.StaticValueProvider.of(path.toString()), + EmptyMatchTreatment.DISALLOW, + delimiter); } - @Test - @Category(NeedsRunner.class) - public void testTxtRead() throws Exception { - // Files with non-compressed extensions should work in AUTO and UNCOMPRESSED modes. - for (Compression type : new Compression[] {AUTO, UNCOMPRESSED}) { - assertReadingCompressedFileMatchesExpected(emptyTxt, type, EMPTY); - assertReadingCompressedFileMatchesExpected(tinyTxt, type, TINY); - assertReadingCompressedFileMatchesExpected(largeTxt, type, LARGE); + private static String getFileSuffix(Compression compression) { + switch (compression) { + case UNCOMPRESSED: + return ".txt"; + case GZIP: + return ".gz"; + case BZIP2: + return ".bz2"; + case ZIP: + return ".zip"; + case DEFLATE: + return ".deflate"; + default: + return ""; } - p.run(); } - @Test - @Category(NeedsRunner.class) - public void testGzipCompressedRead() throws Exception { - // Files with the right extensions should work in AUTO and GZIP modes. - for (Compression type : new Compression[] {AUTO, GZIP}) { - assertReadingCompressedFileMatchesExpected(emptyGz, type, EMPTY); - assertReadingCompressedFileMatchesExpected(tinyGz, type, TINY); - assertReadingCompressedFileMatchesExpected(largeGz, type, LARGE); + /** Tests for reading from different size of files with various Compression. */ + @RunWith(Parameterized.class) + public static class CompressedReadTest { + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + @Rule public TestPipeline p = TestPipeline.create(); + + @Parameterized.Parameters(name = "{index}: {1}") + public static Iterable<Object[]> data() { + return ImmutableList.<Object[]>builder() + .add(new Object[] {EMPTY, UNCOMPRESSED}) + .add(new Object[] {EMPTY, GZIP}) + .add(new Object[] {EMPTY, BZIP2}) + .add(new Object[] {EMPTY, ZIP}) + .add(new Object[] {EMPTY, DEFLATE}) + .add(new Object[] {TINY, UNCOMPRESSED}) + .add(new Object[] {TINY, GZIP}) + .add(new Object[] {TINY, BZIP2}) + .add(new Object[] {TINY, ZIP}) + .add(new Object[] {TINY, DEFLATE}) + .add(new Object[] {LARGE, UNCOMPRESSED}) + .add(new Object[] {LARGE, GZIP}) + .add(new Object[] {LARGE, BZIP2}) + .add(new Object[] {LARGE, ZIP}) + .add(new Object[] {LARGE, DEFLATE}) + .build(); } - // Sanity check that we're properly testing compression. - assertThat(largeTxt.length(), greaterThan(largeGz.length())); + @Parameterized.Parameter(0) + public List<String> lines; - // GZIP files with non-gz extension should work in GZIP mode. - File gzFile = writeToFile(TINY, "tiny_gz_no_extension", GZIP); - assertReadingCompressedFileMatchesExpected(gzFile, GZIP, TINY); - p.run(); - } + @Parameterized.Parameter(1) + public Compression compression; - @Test - @Category(NeedsRunner.class) - public void testBzip2CompressedRead() throws Exception { - // Files with the right extensions should work in AUTO and BZIP2 modes. - for (Compression type : new Compression[] {AUTO, BZIP2}) { - assertReadingCompressedFileMatchesExpected(emptyBzip2, type, EMPTY); - assertReadingCompressedFileMatchesExpected(tinyBzip2, type, TINY); - assertReadingCompressedFileMatchesExpected(largeBzip2, type, LARGE); + /** Tests reading from a small, compressed file with no extension. */ + @Test + @Category(NeedsRunner.class) + public void testCompressedReadWithoutExtension() throws Exception { + String fileName = lines.size() + "_" + compression + "_no_extension"; + File fileWithNoExtension = writeToFile(lines, tempFolder, fileName, compression); + assertReadingCompressedFileMatchesExpected(fileWithNoExtension, compression, lines, p); + p.run(); } - // Sanity check that we're properly testing compression. - assertThat(largeTxt.length(), greaterThan(largeBzip2.length())); + @Test + @Category(NeedsRunner.class) + public void testCompressedReadWithExtension() throws Exception { + String fileName = + lines.size() + "_" + compression + "_no_extension" + getFileSuffix(compression); + File fileWithExtension = writeToFile(lines, tempFolder, fileName, compression); + + // Sanity check that we're properly testing compression. + if (lines.size() == LINES_NUMBER_FOR_LARGE && !compression.equals(UNCOMPRESSED)) { + File uncompressedFile = writeToFile(lines, tempFolder, "large.txt", UNCOMPRESSED); + assertThat(uncompressedFile.length(), greaterThan(fileWithExtension.length())); + } + + assertReadingCompressedFileMatchesExpected(fileWithExtension, compression, lines, p); + p.run(); + } - // BZ2 files with non-bz2 extension should work in BZIP2 mode. - File bz2File = writeToFile(TINY, "tiny_bz2_no_extension", BZIP2); - assertReadingCompressedFileMatchesExpected(bz2File, BZIP2, TINY); - p.run(); + @Test + @Category(NeedsRunner.class) + public void testReadWithAuto() throws Exception { + // Files with non-compressed extensions should work in AUTO and UNCOMPRESSED modes. + String fileName = + lines.size() + "_" + compression + "_no_extension" + getFileSuffix(compression); + File fileWithExtension = writeToFile(lines, tempFolder, fileName, compression); + assertReadingCompressedFileMatchesExpected(fileWithExtension, AUTO, lines, p); + p.run(); + } } - @Test - @Category(NeedsRunner.class) - public void testZipCompressedRead() throws Exception { - // Files with the right extensions should work in AUTO and ZIP modes. - for (Compression type : new Compression[] {AUTO, ZIP}) { - assertReadingCompressedFileMatchesExpected(emptyZip, type, EMPTY); - assertReadingCompressedFileMatchesExpected(tinyZip, type, TINY); - assertReadingCompressedFileMatchesExpected(largeZip, type, LARGE); + /** Tests for reading files with various delimiters. */ + @RunWith(Parameterized.class) + public static class ReadWithDelimiterTest { + private static final ImmutableList<String> EXPECTED = ImmutableList.of("asdf", "hjkl", "xyz"); + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + @Parameterized.Parameters(name = "{index}: {0}") + public static Iterable<Object[]> data() { + return ImmutableList.<Object[]>builder() + .add(new Object[] {"\n\n\n", ImmutableList.of("", "", "")}) + .add(new Object[] {"asdf\nhjkl\nxyz\n", EXPECTED}) + .add(new Object[] {"asdf\rhjkl\rxyz\r", EXPECTED}) + .add(new Object[] {"asdf\r\nhjkl\r\nxyz\r\n", EXPECTED}) + .add(new Object[] {"asdf\rhjkl\r\nxyz\n", EXPECTED}) + .add(new Object[] {"asdf\nhjkl\nxyz", EXPECTED}) + .add(new Object[] {"asdf\rhjkl\rxyz", EXPECTED}) + .add(new Object[] {"asdf\r\nhjkl\r\nxyz", EXPECTED}) + .add(new Object[] {"asdf\rhjkl\r\nxyz", EXPECTED}) + .build(); } - // Sanity check that we're properly testing compression. - assertThat(largeTxt.length(), greaterThan(largeZip.length())); + @Parameterized.Parameter(0) + public String line; - // Zip files with non-zip extension should work in ZIP mode. - File zipFile = writeToFile(TINY, "tiny_zip_no_extension", ZIP); - assertReadingCompressedFileMatchesExpected(zipFile, ZIP, TINY); - p.run(); - } + @Parameterized.Parameter(1) + public ImmutableList<String> expected; - @Test - @Category(NeedsRunner.class) - public void testDeflateCompressedRead() throws Exception { - // Files with the right extensions should work in AUTO and ZIP modes. - for (Compression type : new Compression[] {AUTO, DEFLATE}) { - assertReadingCompressedFileMatchesExpected(emptyDeflate, type, EMPTY); - assertReadingCompressedFileMatchesExpected(tinyDeflate, type, TINY); - assertReadingCompressedFileMatchesExpected(largeDeflate, type, LARGE); + @Test + public void testReadLinesWithDelimiter() throws Exception { + runTestReadWithData(line.getBytes(UTF_8), expected); } - // Sanity check that we're properly testing compression. - assertThat(largeTxt.length(), greaterThan(largeDeflate.length())); - - // Deflate files with non-deflate extension should work in DEFLATE mode. - File deflateFile = writeToFile(TINY, "tiny_deflate_no_extension", DEFLATE); - assertReadingCompressedFileMatchesExpected(deflateFile, DEFLATE, TINY); - p.run(); - } + @Test + public void testSplittingSource() throws Exception { + TextSource source = prepareSource(line.getBytes(UTF_8)); + SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); + } - /** - * Tests a zip file with no entries. This is a corner case not tested elsewhere as the default - * test zip files have a single entry. - */ - @Test - @Category(NeedsRunner.class) - public void testZipCompressedReadWithNoEntries() throws Exception { - String filename = createZipFile(new ArrayList<String>(), "empty zip file"); - assertReadingCompressedFileMatchesExpected(new File(filename), ZIP, EMPTY); - p.run(); - } + private TextSource prepareSource(byte[] data) throws IOException { + return TextIOReadTest.prepareSource(tempFolder, data, null); + } - /** - * Tests a zip file with multiple entries. This is a corner case not tested elsewhere as the - * default test zip files have a single entry. - */ - @Test - @Category(NeedsRunner.class) - public void testZipCompressedReadWithMultiEntriesFile() throws Exception { - String[] entry0 = new String[] {"first", "second", "three"}; - String[] entry1 = new String[] {"four", "five", "six"}; - String[] entry2 = new String[] {"seven", "eight", "nine"}; - - List<String> expected = new ArrayList<>(); - - String filename = createZipFile(expected, "multiple entries", entry0, entry1, entry2); - assertReadingCompressedFileMatchesExpected(new File(filename), ZIP, expected); - p.run(); + private void runTestReadWithData(byte[] data, List<String> expectedResults) throws Exception { + TextSource source = prepareSource(data); + List<String> actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create()); + assertThat( + actual, containsInAnyOrder(new ArrayList<>(expectedResults).toArray(new String[0]))); + } } - /** - * Read a ZIP compressed file containing data, multiple empty entries, and then more data. We - * expect just the data back. - */ - @Test - @Category(NeedsRunner.class) - public void testZipCompressedReadWithComplexEmptyAndPresentEntries() throws Exception { - String filename = - createZipFile( - new ArrayList<String>(), - "complex empty and present entries", - new String[] {"cat"}, - new String[] {}, - new String[] {}, - new String[] {"dog"}); - - assertReadingCompressedFileMatchesExpected( - new File(filename), ZIP, Arrays.asList("cat", "dog")); - p.run(); - } + /** Tests for some basic operations in {@link TextIO.Read}. */ + @RunWith(JUnit4.class) + public static class BasicIOTest { + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + @Rule public TestPipeline p = TestPipeline.create(); - @Test - public void testTextIOGetName() { - assertEquals("TextIO.Read", TextIO.read().from("somefile").getName()); - assertEquals("TextIO.Read", TextIO.read().from("somefile").toString()); - } + private void runTestRead(String[] expected) throws Exception { + File tmpFile = tempFolder.newFile(); + String filename = tmpFile.getPath(); - @Test - public void testProgressEmptyFile() throws IOException { - try (BoundedReader<String> reader = - prepareSource(new byte[0], null).createReader(PipelineOptionsFactory.create())) { - // Check preconditions before starting. - assertEquals(0.0, reader.getFractionConsumed(), 1e-6); - assertEquals(0, reader.getSplitPointsConsumed()); - assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + try (PrintStream writer = new PrintStream(new FileOutputStream(tmpFile))) { + for (String elem : expected) { + byte[] encodedElem = CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem); + String line = new String(encodedElem); + writer.println(line); + } + } - // Assert empty - assertFalse(reader.start()); + TextIO.Read read = TextIO.read().from(filename); + PCollection<String> output = p.apply(read); - // Check postconditions after finishing - assertEquals(1.0, reader.getFractionConsumed(), 1e-6); - assertEquals(0, reader.getSplitPointsConsumed()); - assertEquals(0, reader.getSplitPointsRemaining()); + PAssert.that(output).containsInAnyOrder(expected); + p.run(); } - } - @Test - public void testProgressTextFile() throws IOException { - String file = "line1\nline2\nline3"; - try (BoundedReader<String> reader = - prepareSource(file.getBytes(), null).createReader(PipelineOptionsFactory.create())) { - // Check preconditions before starting - assertEquals(0.0, reader.getFractionConsumed(), 1e-6); - assertEquals(0, reader.getSplitPointsConsumed()); - assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); - - // Line 1 - assertTrue(reader.start()); - assertEquals(0, reader.getSplitPointsConsumed()); - assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); - - // Line 2 - assertTrue(reader.advance()); - assertEquals(1, reader.getSplitPointsConsumed()); - assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); - - // Line 3 - assertTrue(reader.advance()); - assertEquals(2, reader.getSplitPointsConsumed()); - assertEquals(1, reader.getSplitPointsRemaining()); - - // Check postconditions after finishing - assertFalse(reader.advance()); - assertEquals(1.0, reader.getFractionConsumed(), 1e-6); - assertEquals(3, reader.getSplitPointsConsumed()); - assertEquals(0, reader.getSplitPointsRemaining()); + @Test + public void testDelimiterSelfOverlaps(){ + assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c'})); + assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'c', 'a', 'b', 'd', 'a', 'b'})); + assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c', 'a', 'b', 'd'})); + assertTrue(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'a'})); + assertTrue(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c', 'a', 'b'})); } - } - @Test - public void testProgressAfterSplitting() throws IOException { - String file = "line1\nline2\nline3"; - BoundedSource<String> source = prepareSource(file.getBytes()); - BoundedSource<String> remainder; - - // Create the remainder, verifying properties pre- and post-splitting. - try (BoundedReader<String> readerOrig = source.createReader(PipelineOptionsFactory.create())) { - // Preconditions. - assertEquals(0.0, readerOrig.getFractionConsumed(), 1e-6); - assertEquals(0, readerOrig.getSplitPointsConsumed()); - assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, readerOrig.getSplitPointsRemaining()); - - // First record, before splitting. - assertTrue(readerOrig.start()); - assertEquals(0, readerOrig.getSplitPointsConsumed()); - assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, readerOrig.getSplitPointsRemaining()); - - // Split. 0.1 is in line1, so should now be able to detect last record. - remainder = readerOrig.splitAtFraction(0.1); - System.err.println(readerOrig.getCurrentSource()); - assertNotNull(remainder); - - // First record, after splitting. - assertEquals(0, readerOrig.getSplitPointsConsumed()); - assertEquals(1, readerOrig.getSplitPointsRemaining()); - - // Finish and postconditions. - assertFalse(readerOrig.advance()); - assertEquals(1.0, readerOrig.getFractionConsumed(), 1e-6); - assertEquals(1, readerOrig.getSplitPointsConsumed()); - assertEquals(0, readerOrig.getSplitPointsRemaining()); - } - - // Check the properties of the remainder. - try (BoundedReader<String> reader = remainder.createReader(PipelineOptionsFactory.create())) { - // Preconditions. - assertEquals(0.0, reader.getFractionConsumed(), 1e-6); - assertEquals(0, reader.getSplitPointsConsumed()); - assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); - - // First record should be line 2. - assertTrue(reader.start()); - assertEquals(0, reader.getSplitPointsConsumed()); - assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); - - // Second record is line 3 - assertTrue(reader.advance()); - assertEquals(1, reader.getSplitPointsConsumed()); - assertEquals(1, reader.getSplitPointsRemaining()); - - // Check postconditions after finishing - assertFalse(reader.advance()); - assertEquals(1.0, reader.getFractionConsumed(), 1e-6); - assertEquals(2, reader.getSplitPointsConsumed()); - assertEquals(0, reader.getSplitPointsRemaining()); + @Test + @Category(NeedsRunner.class) + public void testReadStringsWithCustomDelimiter() throws Exception { + final String[] inputStrings = + new String[] { + // incomplete delimiter + "To be, or not to be: that |is the question: ", + // incomplete delimiter + "To be, or not to be: that *is the question: ", + // complete delimiter + "Whether 'tis nobler in the mind to suffer |*", + // truncated delimiter + "The slings and arrows of outrageous fortune,|" + }; + + File tmpFile = tempFolder.newFile("tmpfile.txt"); + String filename = tmpFile.getPath(); + + try (FileWriter writer = new FileWriter(tmpFile)) { + writer.write(Joiner.on("").join(inputStrings)); + } + + PAssert.that(p.apply(TextIO.read().from(filename).withDelimiter(new byte[] {'|', '*'}))) + .containsInAnyOrder( + "To be, or not to be: that |is the question: To be, or not to be: " + + "that *is the question: Whether 'tis nobler in the mind to suffer ", + "The slings and arrows of outrageous fortune,|"); + p.run(); } - } - @Test - public void testReadEmptyLines() throws Exception { - runTestReadWithData("\n\n\n".getBytes(StandardCharsets.UTF_8), ImmutableList.of("", "", "")); - } + @Test + public void testSplittingSourceWithCustomDelimiter() throws Exception { + List<String> testCases = Lists.newArrayList(); + String infix = "first|*second|*|*third"; + String[] affixes = new String[] {"", "|", "*", "|*"}; + for (String prefix : affixes) { + for (String suffix : affixes) { + testCases.add(prefix + infix + suffix); + } + } + for (String testCase : testCases) { + SourceTestUtils.assertSplitAtFractionExhaustive( + TextIOReadTest.prepareSource( + tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}), + PipelineOptionsFactory.create()); + } + } - @Test - public void testReadFileWithLineFeedDelimiter() throws Exception { - runTestReadWithData( - "asdf\nhjkl\nxyz\n".getBytes(StandardCharsets.UTF_8), - ImmutableList.of("asdf", "hjkl", "xyz")); - } + @Test + @Category(NeedsRunner.class) + public void testReadStrings() throws Exception { + runTestRead(LINES_ARRAY); + } - @Test - public void testReadFileWithCarriageReturnDelimiter() throws Exception { - runTestReadWithData( - "asdf\rhjkl\rxyz\r".getBytes(StandardCharsets.UTF_8), - ImmutableList.of("asdf", "hjkl", "xyz")); - } + @Test + @Category(NeedsRunner.class) + public void testReadEmptyStrings() throws Exception { + runTestRead(NO_LINES_ARRAY); + } - @Test - public void testReadFileWithCarriageReturnAndLineFeedDelimiter() throws Exception { - runTestReadWithData( - "asdf\r\nhjkl\r\nxyz\r\n".getBytes(StandardCharsets.UTF_8), - ImmutableList.of("asdf", "hjkl", "xyz")); - } + @Test + public void testReadNamed() throws Exception { + File emptyFile = tempFolder.newFile(); + p.enableAbandonedNodeEnforcement(false); - @Test - public void testReadFileWithMixedDelimiters() throws Exception { - runTestReadWithData( - "asdf\rhjkl\r\nxyz\n".getBytes(StandardCharsets.UTF_8), - ImmutableList.of("asdf", "hjkl", "xyz")); - } + assertEquals("TextIO.Read/Read.out", p.apply(TextIO.read().from("somefile")).getName()); + assertEquals( + "MyRead/Read.out", p.apply("MyRead", TextIO.read().from(emptyFile.getPath())).getName()); + } - @Test - public void testReadFileWithLineFeedDelimiterAndNonEmptyBytesAtEnd() throws Exception { - runTestReadWithData( - "asdf\nhjkl\nxyz".getBytes(StandardCharsets.UTF_8), - ImmutableList.of("asdf", "hjkl", "xyz")); - } + @Test + public void testReadDisplayData() { + TextIO.Read read = TextIO.read().from("foo.*").withCompression(BZIP2); - @Test - public void testReadFileWithCarriageReturnDelimiterAndNonEmptyBytesAtEnd() throws Exception { - runTestReadWithData( - "asdf\rhjkl\rxyz".getBytes(StandardCharsets.UTF_8), - ImmutableList.of("asdf", "hjkl", "xyz")); - } + DisplayData displayData = DisplayData.from(read); - @Test - public void testReadFileWithCarriageReturnAndLineFeedDelimiterAndNonEmptyBytesAtEnd() - throws Exception { - runTestReadWithData( - "asdf\r\nhjkl\r\nxyz".getBytes(StandardCharsets.UTF_8), - ImmutableList.of("asdf", "hjkl", "xyz")); - } + assertThat(displayData, hasDisplayItem("filePattern", "foo.*")); + assertThat(displayData, hasDisplayItem("compressionType", BZIP2.toString())); + } - @Test - public void testReadFileWithMixedDelimitersAndNonEmptyBytesAtEnd() throws Exception { - runTestReadWithData( - "asdf\rhjkl\r\nxyz".getBytes(StandardCharsets.UTF_8), - ImmutableList.of("asdf", "hjkl", "xyz")); - } + @Test + @Category(ValidatesRunner.class) + public void testPrimitiveReadDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - private void runTestReadWithData(byte[] data, List<String> expectedResults) throws Exception { - TextSource source = prepareSource(data); - List<String> actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create()); - assertThat(actual, containsInAnyOrder(new ArrayList<>(expectedResults).toArray(new String[0]))); - } + TextIO.Read read = TextIO.read().from("foobar"); - @Test - public void testSplittingSourceWithEmptyLines() throws Exception { - TextSource source = prepareSource("\n\n\n".getBytes(UTF_8)); - SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); - } + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); + assertThat( + "TextIO.Read should include the file prefix in its primitive display data", + displayData, + hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); + } - @Test - public void testSplittingSourceWithLineFeedDelimiter() throws Exception { - TextSource source = prepareSource("asdf\nhjkl\nxyz\n".getBytes(UTF_8)); - SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); - } + /** Options for testing. */ + public interface RuntimeTestOptions extends PipelineOptions { + ValueProvider<String> getInput(); + void setInput(ValueProvider<String> value); + } - @Test - public void testSplittingSourceWithCarriageReturnDelimiter() throws Exception { - TextSource source = prepareSource("asdf\rhjkl\rxyz\r".getBytes(UTF_8)); - SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); - } + @Test + public void testRuntimeOptionsNotCalledInApply() throws Exception { + p.enableAbandonedNodeEnforcement(false); - @Test - public void testSplittingSourceWithCarriageReturnAndLineFeedDelimiter() throws Exception { - TextSource source = prepareSource("asdf\r\nhjkl\r\nxyz\r\n".getBytes(UTF_8)); - SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); - } + RuntimeTestOptions options = + PipelineOptionsFactory.as(RuntimeTestOptions.class); - @Test - public void testSplittingSourceWithMixedDelimiters() throws Exception { - TextSource source = prepareSource("asdf\rhjkl\r\nxyz\n".getBytes(UTF_8)); - SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); - } + p.apply(TextIO.read().from(options.getInput())); + } - @Test - public void testSplittingSourceWithLineFeedDelimiterAndNonEmptyBytesAtEnd() throws Exception { - TextSource source = prepareSource("asdf\nhjkl\nxyz".getBytes(UTF_8)); - SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); - } + @Test + public void testCompressionIsSet() throws Exception { + TextIO.Read read = TextIO.read().from("/tmp/test"); + assertEquals(AUTO, read.getCompression()); + read = TextIO.read().from("/tmp/test").withCompression(GZIP); + assertEquals(GZIP, read.getCompression()); + } - @Test - public void testSplittingSourceWithCarriageReturnDelimiterAndNonEmptyBytesAtEnd() - throws Exception { - TextSource source = prepareSource("asdf\rhjkl\rxyz".getBytes(UTF_8)); - SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); - } + /** + * Tests reading from a small, uncompressed file with .gz extension. This must work in + * GZIP modes. This is needed because some network file systems / HTTP clients will + * transparently decompress gzipped content. + */ + @Test + @Category(NeedsRunner.class) + public void testSmallCompressedGzipReadActuallyUncompressed() throws Exception { + File smallGzNotCompressed = + writeToFile(TINY, tempFolder, "tiny_uncompressed.gz", UNCOMPRESSED); + // Should work with GZIP compression set. + assertReadingCompressedFileMatchesExpected(smallGzNotCompressed, GZIP, TINY, p); + p.run(); + } - @Test - public void testSplittingSourceWithCarriageReturnAndLineFeedDelimiterAndNonEmptyBytesAtEnd() - throws Exception { - TextSource source = prepareSource("asdf\r\nhjkl\r\nxyz".getBytes(UTF_8)); - SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); - } + /** + * Tests reading from a small, uncompressed file with .gz extension. This must work in + * AUTO modes. This is needed because some network file systems / HTTP clients will + * transparently decompress gzipped content. + */ + @Test + @Category(NeedsRunner.class) + public void testSmallCompressedAutoReadActuallyUncompressed() throws Exception { + File smallGzNotCompressed = + writeToFile(TINY, tempFolder, "tiny_uncompressed.gz", UNCOMPRESSED); + // Should also work with AUTO mode set. + assertReadingCompressedFileMatchesExpected(smallGzNotCompressed, AUTO, TINY, p); + p.run(); + } - @Test - public void testSplittingSourceWithMixedDelimitersAndNonEmptyBytesAtEnd() throws Exception { - TextSource source = prepareSource("asdf\rhjkl\r\nxyz".getBytes(UTF_8)); - SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); - } + /** + * Tests a zip file with no entries. This is a corner case not tested elsewhere as the default + * test zip files have a single entry. + */ + @Test + @Category(NeedsRunner.class) + public void testZipCompressedReadWithNoEntries() throws Exception { + File file = createZipFile(new ArrayList<String>(), tempFolder, "empty zip file"); + assertReadingCompressedFileMatchesExpected(file, ZIP, EMPTY, p); + p.run(); + } - private TextSource prepareSource(byte[] data) throws IOException { - return prepareSource(data, null /* default delimiters */); - } + /** + * Tests a zip file with multiple entries. This is a corner case not tested elsewhere as the + * default test zip files have a single entry. + */ + @Test + @Category(NeedsRunner.class) + public void testZipCompressedReadWithMultiEntriesFile() throws Exception { + String[] entry0 = new String[] {"first", "second", "three"}; + String[] entry1 = new String[] {"four", "five", "six"}; + String[] entry2 = new String[] {"seven", "eight", "nine"}; + + List<String> expected = new ArrayList<>(); + + File file = + createZipFile(expected, tempFolder, "multiple entries", entry0, entry1, entry2); + assertReadingCompressedFileMatchesExpected(file, ZIP, expected, p); + p.run(); + } - private TextSource prepareSource(byte[] data, byte[] delimiter) throws IOException { - Path path = Files.createTempFile(tempFolder, "tempfile", "ext"); - Files.write(path, data); - return new TextSource(ValueProvider.StaticValueProvider.of(path.toString()), - EmptyMatchTreatment.DISALLOW, delimiter); - } + /** + * Read a ZIP compressed file containing data, multiple empty entries, and then more data. We + * expect just the data back. + */ + @Test + @Category(NeedsRunner.class) + public void testZipCompressedReadWithComplexEmptyAndPresentEntries() throws Exception { + File file = + createZipFile( + new ArrayList<String>(), + tempFolder, + "complex empty and present entries", + new String[] {"cat"}, + new String[] {}, + new String[] {}, + new String[] {"dog"}); + + assertReadingCompressedFileMatchesExpected( + file, ZIP, Arrays.asList("cat", "dog"), p); + p.run(); + } - @Test - public void testInitialSplitAutoModeTxt() throws Exception { - PipelineOptions options = TestPipeline.testingPipelineOptions(); - long desiredBundleSize = 1000; + @Test + public void testTextIOGetName() { + assertEquals("TextIO.Read", TextIO.read().from("somefile").getName()); + assertEquals("TextIO.Read", TextIO.read().from("somefile").toString()); + } - // Sanity check: file is at least 2 bundles long. - assertThat(largeTxt.length(), greaterThan(2 * desiredBundleSize)); + private TextSource prepareSource(byte[] data) throws IOException { + return TextIOReadTest.prepareSource(tempFolder, data, null); + } - FileBasedSource<String> source = TextIO.read().from(largeTxt.getPath()).getSource(); - List<? extends FileBasedSource<String>> splits = source.split(desiredBundleSize, options); + @Test + public void testProgressEmptyFile() throws IOException { + try (BoundedSource.BoundedReader<String> reader = + prepareSource(new byte[0]).createReader(PipelineOptionsFactory.create())) { + // Check preconditions before starting. + assertEquals(0.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals( + BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // Assert empty + assertFalse(reader.start()); + + // Check postconditions after finishing + assertEquals(1.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(0, reader.getSplitPointsRemaining()); + } + } - // At least 2 splits and they are equal to reading the whole file. - assertThat(splits, hasSize(greaterThan(1))); - SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); - } + @Test + public void testProgressTextFile() throws IOException { + String file = "line1\nline2\nline3"; + try (BoundedSource.BoundedReader<String> reader = + prepareSource(file.getBytes()).createReader(PipelineOptionsFactory.create())) { + // Check preconditions before starting + assertEquals(0.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals( + BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // Line 1 + assertTrue(reader.start()); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals( + BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // Line 2 + assertTrue(reader.advance()); + assertEquals(1, reader.getSplitPointsConsumed()); + assertEquals( + BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // Line 3 + assertTrue(reader.advance()); + assertEquals(2, reader.getSplitPointsConsumed()); + assertEquals(1, reader.getSplitPointsRemaining()); + + // Check postconditions after finishing + assertFalse(reader.advance()); + assertEquals(1.0, reader.getFractionConsumed(), 1e-6); + assertEquals(3, reader.getSplitPointsConsumed()); + assertEquals(0, reader.getSplitPointsRemaining()); + } + } - @Test - public void testInitialSplitAutoModeGz() throws Exception { - long desiredBundleSize = 1000; - PipelineOptions options = TestPipeline.testingPipelineOptions(); + @Test + public void testProgressAfterSplitting() throws IOException { + String file = "line1\nline2\nline3"; + BoundedSource<String> source = prepareSource(file.getBytes()); + BoundedSource<String> remainder; + + // Create the remainder, verifying properties pre- and post-splitting. + try (BoundedSource.BoundedReader<String> readerOrig = + source.createReader(PipelineOptionsFactory.create())) { + // Preconditions. + assertEquals(0.0, readerOrig.getFractionConsumed(), 1e-6); + assertEquals(0, readerOrig.getSplitPointsConsumed()); + assertEquals( + BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, readerOrig.getSplitPointsRemaining()); + + // First record, before splitting. + assertTrue(readerOrig.start()); + assertEquals(0, readerOrig.getSplitPointsConsumed()); + assertEquals( + BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, readerOrig.getSplitPointsRemaining()); + + // Split. 0.1 is in line1, so should now be able to detect last record. + remainder = readerOrig.splitAtFraction(0.1); + System.err.println(readerOrig.getCurrentSource()); + assertNotNull(remainder); + + // First record, after splitting. + assertEquals(0, readerOrig.getSplitPointsConsumed()); + assertEquals(1, readerOrig.getSplitPointsRemaining()); + + // Finish and postconditions. + assertFalse(readerOrig.advance()); + assertEquals(1.0, readerOrig.getFractionConsumed(), 1e-6); + assertEquals(1, readerOrig.getSplitPointsConsumed()); + assertEquals(0, readerOrig.getSplitPointsRemaining()); + } - // Sanity check: file is at least 2 bundles long. - assertThat(largeGz.length(), greaterThan(2 * desiredBundleSize)); + // Check the properties of the remainder. + try (BoundedSource.BoundedReader<String> reader = + remainder.createReader(PipelineOptionsFactory.create())) { + // Preconditions. + assertEquals(0.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals( + BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // First record should be line 2. + assertTrue(reader.start()); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals( + BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // Second record is line 3 + assertTrue(reader.advance()); + assertEquals(1, reader.getSplitPointsConsumed()); + assertEquals(1, reader.getSplitPointsRemaining()); + + // Check postconditions after finishing + assertFalse(reader.advance()); + assertEquals(1.0, reader.getFractionConsumed(), 1e-6); + assertEquals(2, reader.getSplitPointsConsumed()); + assertEquals(0, reader.getSplitPointsRemaining()); + } + } - FileBasedSource<String> source = TextIO.read().from(largeGz.getPath()).getSource(); - List<? extends FileBasedSource<String>> splits = source.split(desiredBundleSize, options); + @Test + public void testInitialSplitAutoModeTxt() throws Exception { + PipelineOptions options = TestPipeline.testingPipelineOptions(); + long desiredBundleSize = 1000; + File largeTxt = writeToFile(LARGE, tempFolder, "large.txt", UNCOMPRESSED); - // Exactly 1 split, even in AUTO mode, since it is a gzip file. - assertThat(splits, hasSize(equalTo(1))); - SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); - } + // Sanity check: file is at least 2 bundles long. + assertThat(largeTxt.length(), greaterThan(2 * desiredBundleSize)); - @Test - public void testInitialSplitGzipModeTxt() throws Exception { - PipelineOptions options = TestPipeline.testingPipelineOptions(); - long desiredBundleSize = 1000; + FileBasedSource<String> source = TextIO.read().from(largeTxt.getPath()).getSource(); + List<? extends FileBasedSource<String>> splits = source.split(desiredBundleSize, options); - // Sanity check: file is at least 2 bundles long. - assertThat(largeTxt.length(), greaterThan(2 * desiredBundleSize)); + // At least 2 splits and they are equal to reading the whole file. + assertThat(splits, hasSize(greaterThan(1))); + SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); + } - FileBasedSource<String> source = - TextIO.read().from(largeTxt.getPath()).withCompression(GZIP).getSource(); - List<? extends FileBasedSource<String>> splits = source.split(desiredBundleSize, options); + @Test + public void testInitialSplitAutoModeGz() throws Exception { + PipelineOptions options = TestPipeline.testingPipelineOptions(); + long desiredBundleSize = 1000; + File largeGz = writeToFile(LARGE, tempFolder, "large.gz", GZIP); + // Sanity check: file is at least 2 bundles long. + assertThat(largeGz.length(), greaterThan(2 * desiredBundleSize)); - // Exactly 1 split, even though splittable text file, since using GZIP mode. - assertThat(splits, hasSize(equalTo(1))); - SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); - } + FileBasedSource<String> source = TextIO.read().from(largeGz.getPath()).getSource(); + List<? extends FileBasedSource<String>> splits = source.split(desiredBundleSize, options); - @Test - public void testInitialSplitGzipModeGz() throws Exception { - PipelineOptions options = TestPipeline.testingPipelineOptions(); - long desiredBundleSize = 1000; + // Exactly 1 split, even in AUTO mode, since it is a gzip file. + assertThat(splits, hasSize(equalTo(1))); + SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); + } - // Sanity check: file is at least 2 bundles long. - assertThat(largeGz.length(), greaterThan(2 * desiredBundleSize)); + @Test + public void testInitialSplitGzipModeTxt() throws Exception { + PipelineOptions options = TestPipeline.testingPipelineOptions(); + long desiredBundleSize = 1000; + File largeTxt = writeToFile(LARGE, tempFolder, "large.txt", UNCOMPRESSED); + // Sanity check: file is at least 2 bundles long. + assertThat(largeTxt.length(), greaterThan(2 * desiredBundleSize)); - FileBasedSource<String> source = - TextIO.read().from(largeGz.getPath()).withCompression(GZIP).getSource(); - List<? extends FileBasedSource<String>> splits = source.split(desiredBundleSize, options); + FileBasedSource<String> source = + TextIO.read().from(largeTxt.getPath()).withCompression(GZIP).getSource(); + List<? extends FileBasedSource<String>> splits = source.split(desiredBundleSize, options); - // Exactly 1 split using .gz extension and using GZIP mode. - assertThat(splits, hasSize(equalTo(1))); - SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); - } + // Exactly 1 split, even though splittable text file, since using GZIP mode. + assertThat(splits, hasSize(equalTo(1))); + SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); + } - @Test - @Category(NeedsRunner.class) - public void testReadAll() throws IOException { - writeToFile(TINY, "readAllTiny1.zip", ZIP); - writeToFile(TINY, "readAllTiny2.txt", UNCOMPRESSED); - writeToFile(LARGE, "readAllLarge1.zip", ZIP); - writeToFile(LARGE, "readAllLarge2.txt", UNCOMPRESSED); - PCollection<String> lines = + @Test + @Category(NeedsRunner.class) + public void testReadAll() throws IOException { + Path tempFolderPath = tempFolder.getRoot().toPath(); + writeToFile(TINY, tempFolder, "readAllTiny1.zip", ZIP); + writeToFile(TINY, tempFolder, "readAllTiny2.txt", UNCOMPRESSED); + writeToFile(LARGE, tempFolder, "readAllLarge1.zip", ZIP); + writeToFile(LARGE, tempFolder, "readAllLarge2.txt", UNCOMPRESSED); + PCollection<String> lines = p.apply( - Create.of( - tempFolder.resolve("readAllTiny*").toString(), - tempFolder.resolve("readAllLarge*").toString())) - .apply(TextIO.readAll().withCompression(AUTO)); - PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, LARGE)); - p.run(); - } + Create.of( + tempFolderPath.resolve("readAllTiny*").toString(), + tempFolderPath.resolve("readAllLarge*").toString())) + .apply(TextIO.readAll().withCompression(AUTO)); + PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, LARGE)); + p.run(); + } - @Test - @Category(NeedsRunner.class) - public void testReadFiles() throws IOException { - writeToFile(TINY, "readAllTiny1.zip", ZIP); - writeToFile(TINY, "readAllTiny2.txt", UNCOMPRESSED); - writeToFile(LARGE, "readAllLarge1.zip", ZIP); - writeToFile(LARGE, "readAllLarge2.txt", UNCOMPRESSED); - PCollection<String> lines = + @Test + @Category(NeedsRunner.class) + public void testReadFiles() throws IOException { + Path tempFolderPath = tempFolder.getRoot().toPath(); + writeToFile(TINY, tempFolder, "readAllTiny1.zip", ZIP); + writeToFile(TINY, tempFolder, "readAllTiny2.txt", UNCOMPRESSED); + writeToFile(LARGE, tempFolder, "readAllLarge1.zip", ZIP); + writeToFile(LARGE, tempFolder, "readAllLarge2.txt", UNCOMPRESSED); + PCollection<String> lines = p.apply( - Create.of( - tempFolder.resolve("readAllTiny*").toString(), - tempFolder.resolve("readAllLarge*").toString())) - .apply(FileIO.matchAll()) - .apply(FileIO.readMatches().withCompression(AUTO)) - .apply(TextIO.readFiles().withDesiredBundleSizeBytes(10)); - PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, LARGE)); - p.run(); - } + Create.of( + tempFolderPath.resolve("readAllTiny*").toString(), + tempFolderPath.resolve("readAllLarge*").toString())) + .apply(FileIO.matchAll()) + .apply(FileIO.readMatches().withCompression(AUTO)) + .apply(TextIO.readFiles().withDesiredBundleSizeBytes(10)); + PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, LARGE)); + p.run(); + } - @Test - @Category({NeedsRunner.class, UsesSplittableParDo.class}) - public void testReadWatchForNewFiles() throws IOException, InterruptedException { - final Path basePath = tempFolder.resolve("readWatch"); - basePath.toFile().mkdir(); - PCollection<String> lines = - p.apply( - TextIO.read() - .from(basePath.resolve("*").toString()) - // Make sure that compression type propagates into readAll() - .withCompression(ZIP) - .watchForNewFiles( - Duration.millis(100), - Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3)))); - - Thread writer = + @Test + @Category({NeedsRunner.class, UsesSplittableParDo.class}) + public void testReadWatchForNewFiles() throws IOException, InterruptedException { + final Path basePath = tempFolder.getRoot().toPath().resolve("readWatch"); + basePath.toFile().mkdir(); + PCollection<String> lines = + p.apply( + TextIO.read() + .from(basePath.resolve("*").toString()) + // Make sure that compression type propagates into readAll() + .withCompression(ZIP) + .watchForNewFiles( + Duration.millis(100), + Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3)))); + + Thread writer = new Thread() { @Override public void run() { try { Thread.sleep(1000); writeToFile( - Arrays.asList("a.1", "a.2"), - basePath.resolve("fileA").toString(), - ZIP); + Arrays.asList("a.1", "a.2"), + tempFolder, + basePath.resolve("fileA").toString(), + ZIP); Thread.sleep(300); writeToFile( - Arrays.asList("b.1", "b.2"), - basePath.resolve("fileB").toString(), - ZIP); + Arrays.asList("b.1", "b.2"), + tempFolder, + basePath.resolve("fileB").toString(), + ZIP); Thread.sleep(300); writeToFile( - Arrays.asList("c.1", "c.2"), - basePath.resolve("fileC").toString(), - ZIP); + Arrays.asList("c.1", "c.2"), + tempFolder, + basePath.resolve("fileC").toString(), + ZIP); } catch (IOException | InterruptedException e) { throw new RuntimeException(e); } } }; - writer.start(); + writer.start(); - PAssert.that(lines).containsInAnyOrder("a.1", "a.2", "b.1", "b.2", "c.1", "c.2"); - p.run(); + PAssert.that(lines).containsInAnyOrder("a.1", "a.2", "b.1", "b.2", "c.1", "c.2"); + p.run(); - writer.join(); + writer.join(); + } } }
