Repository: beam Updated Branches: refs/heads/master 5ea2537ec -> fdd99650d
Speeds up CompressedSourceTest Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fecfbc11 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fecfbc11 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fecfbc11 Branch: refs/heads/master Commit: fecfbc112840d355ade8923b3dbf70486e4f0ed8 Parents: 5ea2537 Author: Eugene Kirpichov <[email protected]> Authored: Thu Sep 14 17:51:28 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Mon Sep 18 13:47:08 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/CompressedSourceTest.java | 79 ++++++-------------- 1 file changed, 23 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/fecfbc11/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java index 352d38a..f932d43 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; @@ -29,6 +28,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import com.google.common.collect.HashMultiset; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.io.Files; @@ -60,24 +60,16 @@ import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.SourceTestUtils; -import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; -import org.hamcrest.Matchers; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.internal.matchers.ThrowableMessageMatcher; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; @@ -90,9 +82,6 @@ import org.junit.runners.JUnit4; public class CompressedSourceTest { @Rule - public TestPipeline p = TestPipeline.create(); - - @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @Rule @@ -102,7 +91,6 @@ public class CompressedSourceTest { * Test reading nonempty input with gzip. */ @Test - @Category(NeedsRunner.class) public void testReadGzip() throws Exception { byte[] input = generateInput(5000); runReadTest(input, CompressionMode.GZIP); @@ -174,7 +162,6 @@ public class CompressedSourceTest { * Test reading nonempty input with bzip2. */ @Test - @Category(NeedsRunner.class) public void testReadBzip2() throws Exception { byte[] input = generateInput(5000); runReadTest(input, CompressionMode.BZIP2); @@ -184,7 +171,6 @@ public class CompressedSourceTest { * Test reading nonempty input with zip. */ @Test - @Category(NeedsRunner.class) public void testReadZip() throws Exception { byte[] input = generateInput(5000); runReadTest(input, CompressionMode.ZIP); @@ -194,7 +180,6 @@ public class CompressedSourceTest { * Test reading nonempty input with deflate. */ @Test - @Category(NeedsRunner.class) public void testReadDeflate() throws Exception { byte[] input = generateInput(5000); runReadTest(input, CompressionMode.DEFLATE); @@ -204,7 +189,6 @@ public class CompressedSourceTest { * Test reading empty input with gzip. */ @Test - @Category(NeedsRunner.class) public void testEmptyReadGzip() throws Exception { byte[] input = generateInput(0); runReadTest(input, CompressionMode.GZIP); @@ -232,7 +216,6 @@ public class CompressedSourceTest { * to be the concatenation of those individual files. */ @Test - @Category(NeedsRunner.class) public void testReadConcatenatedGzip() throws IOException { byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8); byte[] body = "1,2,3\n4,5,6\n7,8,9\n".getBytes(StandardCharsets.UTF_8); @@ -246,10 +229,8 @@ public class CompressedSourceTest { CompressedSource<Byte> source = CompressedSource.from(new ByteSource(tmpFile.getAbsolutePath(), 1)) .withDecompression(CompressionMode.GZIP); - PCollection<Byte> output = p.apply(Read.from(source)); - - PAssert.that(output).containsInAnyOrder(Bytes.asList(expected)); - p.run(); + List<Byte> actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create()); + assertEquals(Bytes.asList(expected), actual); } /** @@ -259,7 +240,6 @@ public class CompressedSourceTest { * those streams. */ @Test - @Category(NeedsRunner.class) public void testReadMultiStreamBzip2() throws IOException { CompressionMode mode = CompressionMode.BZIP2; byte[] input1 = generateInput(5, 587973); @@ -289,7 +269,6 @@ public class CompressedSourceTest { * Test reading empty input with bzip2. */ @Test - @Category(NeedsRunner.class) public void testCompressedReadBzip2() throws Exception { byte[] input = generateInput(0); runReadTest(input, CompressionMode.BZIP2); @@ -299,7 +278,6 @@ public class CompressedSourceTest { * Test reading according to filepattern when the file is bzipped. */ @Test - @Category(NeedsRunner.class) public void testCompressedAccordingToFilepatternGzip() throws Exception { byte[] input = generateInput(100); File tmpFile = tmpFolder.newFile("test.gz"); @@ -311,7 +289,6 @@ public class CompressedSourceTest { * Test reading according to filepattern when the file is gzipped. */ @Test - @Category(NeedsRunner.class) public void testCompressedAccordingToFilepatternBzip2() throws Exception { byte[] input = generateInput(100); File tmpFile = tmpFolder.newFile("test.bz2"); @@ -323,7 +300,6 @@ public class CompressedSourceTest { * Test reading multiple files with different compression. */ @Test - @Category(NeedsRunner.class) public void testHeterogeneousCompression() throws Exception { String baseName = "test-input"; @@ -333,28 +309,26 @@ public class CompressedSourceTest { // Every sort of compression File uncompressedFile = tmpFolder.newFile(baseName + ".bin"); - generated = generateInput(1000); + generated = generateInput(1000, 1); Files.write(generated, uncompressedFile); expected.addAll(Bytes.asList(generated)); File gzipFile = tmpFolder.newFile(baseName + ".gz"); - generated = generateInput(1000); + generated = generateInput(1000, 2); writeFile(gzipFile, generated, CompressionMode.GZIP); expected.addAll(Bytes.asList(generated)); File bzip2File = tmpFolder.newFile(baseName + ".bz2"); - generated = generateInput(1000); - writeFile(bzip2File, generateInput(1000), CompressionMode.BZIP2); + generated = generateInput(1000, 3); + writeFile(bzip2File, generated, CompressionMode.BZIP2); expected.addAll(Bytes.asList(generated)); String filePattern = new File(tmpFolder.getRoot().toString(), baseName + ".*").toString(); CompressedSource<Byte> source = CompressedSource.from(new ByteSource(filePattern, 1)); - PCollection<Byte> output = p.apply(Read.from(source)); - - PAssert.that(output).containsInAnyOrder(expected); - p.run(); + List<Byte> actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create()); + assertEquals(HashMultiset.create(actual), HashMultiset.create(expected)); } @Test @@ -414,7 +388,6 @@ public class CompressedSourceTest { * this due to properties of services that we read from. */ @Test - @Category(NeedsRunner.class) public void testFalseGzipStream() throws Exception { byte[] input = generateInput(1000); File tmpFile = tmpFolder.newFile("test.gz"); @@ -427,15 +400,11 @@ public class CompressedSourceTest { * we fail. */ @Test - @Category(NeedsRunner.class) public void testFalseBzip2Stream() throws Exception { byte[] input = generateInput(1000); File tmpFile = tmpFolder.newFile("test.bz2"); Files.write(input, tmpFile); - thrown.expectCause(Matchers.allOf( - instanceOf(IOException.class), - ThrowableMessageMatcher.hasMessage( - containsString("Stream is not in the BZip2 format")))); + thrown.expectMessage("Stream is not in the BZip2 format"); verifyReadContents(input, tmpFile, CompressionMode.BZIP2); } @@ -444,7 +413,6 @@ public class CompressedSourceTest { * the gzip header is two bytes. */ @Test - @Category(NeedsRunner.class) public void testEmptyReadGzipUncompressed() throws Exception { byte[] input = generateInput(0); File tmpFile = tmpFolder.newFile("test.gz"); @@ -457,7 +425,6 @@ public class CompressedSourceTest { * the gzip header is two bytes. */ @Test - @Category(NeedsRunner.class) public void testOneByteReadGzipUncompressed() throws Exception { byte[] input = generateInput(1); File tmpFile = tmpFolder.newFile("test.gz"); @@ -469,15 +436,14 @@ public class CompressedSourceTest { * Test reading multiple files. */ @Test - @Category(NeedsRunner.class) public void testCompressedReadMultipleFiles() throws Exception { - int numFiles = 10; + int numFiles = 3; String baseName = "test_input-"; String filePattern = new File(tmpFolder.getRoot().toString(), baseName + "*").toString(); List<Byte> expected = new ArrayList<>(); for (int i = 0; i < numFiles; i++) { - byte[] generated = generateInput(1000); + byte[] generated = generateInput(100); File tmpFile = tmpFolder.newFile(baseName + i); writeFile(tmpFile, generated, CompressionMode.GZIP); expected.addAll(Bytes.asList(generated)); @@ -486,10 +452,8 @@ public class CompressedSourceTest { CompressedSource<Byte> source = CompressedSource.from(new ByteSource(filePattern, 1)) .withDecompression(CompressionMode.GZIP); - PCollection<Byte> output = p.apply(Read.from(source)); - - PAssert.that(output).containsInAnyOrder(expected); - p.run(); + List<Byte> actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create()); + assertEquals(HashMultiset.create(expected), HashMultiset.create(actual)); } @Test @@ -607,20 +571,23 @@ public class CompressedSourceTest { } private void verifyReadContents(byte[] expected, File inputFile, - @Nullable DecompressingChannelFactory decompressionFactory) { + @Nullable DecompressingChannelFactory decompressionFactory) throws IOException { CompressedSource<Byte> source = CompressedSource.from(new ByteSource(inputFile.toPath().toString(), 1)); if (decompressionFactory != null) { source = source.withDecompression(decompressionFactory); } - PCollection<KV<Long, Byte>> output = p.apply(Read.from(source)) - .apply(ParDo.of(new ExtractIndexFromTimestamp())); - ArrayList<KV<Long, Byte>> expectedOutput = new ArrayList<>(); + List<KV<Long, Byte>> actualOutput = Lists.newArrayList(); + try (BoundedReader<Byte> reader = source.createReader(PipelineOptionsFactory.create())) { + for (boolean more = reader.start(); more; more = reader.advance()) { + actualOutput.add(KV.of(reader.getCurrentTimestamp().getMillis(), reader.getCurrent())); + } + } + List<KV<Long, Byte>> expectedOutput = Lists.newArrayList(); for (int i = 0; i < expected.length; i++) { expectedOutput.add(KV.of((long) i, expected[i])); } - PAssert.that(output).containsInAnyOrder(expectedOutput); - p.run(); + assertEquals(expectedOutput, actualOutput); } /**
