Repository: beam Updated Branches: refs/heads/master 49c392790 -> f7aa41aa1
[BEAM-2682] Deletes AvroIOTransformTest Instead, merges the little test coverage it provided into AvroIOTest. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8242abdd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8242abdd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8242abdd Branch: refs/heads/master Commit: 8242abdd433ea961deb672c5c5d00a21b4777d56 Parents: 49c3927 Author: Eugene Kirpichov <[email protected]> Authored: Mon Oct 16 14:48:17 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Oct 20 11:49:17 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroIO.java | 3 + .../java/org/apache/beam/sdk/io/AvroIOTest.java | 237 ++++++++++---- .../apache/beam/sdk/io/AvroIOTransformTest.java | 324 ------------------- 3 files changed, 173 insertions(+), 391 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8242abdd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 1474759..2cc0f52 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -1156,6 +1156,9 @@ public class AvroIO { getFormatFunction() == null, "A format function should not be specified " + "with DynamicDestinations. Use DynamicDestinations.formatRecord instead"); + } else { + checkArgument( + getSchema() != null, "Unless using DynamicDestinations, .withSchema() is required."); } ValueProvider<ResourceId> tempDirectory = getTempDirectory(); http://git-wip-us.apache.org/repos/asf/beam/blob/8242abdd/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 3976392..239c9f4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -40,6 +40,7 @@ import com.google.common.collect.Multimap; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.Serializable; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -103,17 +104,17 @@ import org.junit.runners.JUnit4; /** Tests for AvroIO Read and Write transforms. */ @RunWith(JUnit4.class) -public class AvroIOTest { +public class AvroIOTest implements Serializable { @Rule - public TestPipeline writePipeline = TestPipeline.create(); + public transient TestPipeline writePipeline = TestPipeline.create(); @Rule - public TestPipeline readPipeline = TestPipeline.create(); + public transient TestPipeline readPipeline = TestPipeline.create(); - @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); - @Rule public ExpectedException expectedException = ExpectedException.none(); + @Rule public transient ExpectedException expectedException = ExpectedException.none(); @Test public void testAvroIOGetName() { @@ -165,16 +166,141 @@ public class AvroIOTest { } } + private static final String SCHEMA_STRING = + "{\"namespace\": \"example.avro\",\n" + + " \"type\": \"record\",\n" + + " \"name\": \"AvroGeneratedUser\",\n" + + " \"fields\": [\n" + + " {\"name\": \"name\", \"type\": \"string\"},\n" + + " {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n" + + " {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n" + + " ]\n" + + "}"; + + private static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_STRING); + @Test @Category(NeedsRunner.class) - public void testAvroIOWriteAndReadAndParseASingleFile() throws Throwable { + public void testWriteThenReadJavaClass() throws Throwable { + List<GenericClass> values = + ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); + File outputFile = tmpFolder.newFile("output.avro"); + + writePipeline + .apply(Create.of(values)) + .apply( + AvroIO.write(GenericClass.class) + .to(writePipeline.newProvider(outputFile.getAbsolutePath())) + .withoutSharding()); + writePipeline.run(); + + PAssert.that( + readPipeline.apply( + "Read", + AvroIO.read(GenericClass.class) + .from(readPipeline.newProvider(outputFile.getAbsolutePath())))) + .containsInAnyOrder(values); + + readPipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testWriteThenReadCustomType() throws Throwable { + List<Long> values = Arrays.asList(0L, 1L, 2L); + File outputFile = tmpFolder.newFile("output.avro"); + + writePipeline + .apply(Create.of(values)) + .apply( + AvroIO.<Long, GenericClass>writeCustomType() + .to(writePipeline.newProvider(outputFile.getAbsolutePath())) + .withFormatFunction(new CreateGenericClass()) + .withSchema(ReflectData.get().getSchema(GenericClass.class)) + .withoutSharding()); + writePipeline.run(); + + PAssert.that( + readPipeline + .apply( + "Read", + AvroIO.read(GenericClass.class) + .from(readPipeline.newProvider(outputFile.getAbsolutePath()))) + .apply( + MapElements.via( + new SimpleFunction<GenericClass, Long>() { + @Override + public Long apply(GenericClass input) { + return (long) input.intField; + } + }))) + .containsInAnyOrder(values); + + readPipeline.run(); + } + + private <T extends GenericRecord> void testWriteThenReadGeneratedClass( + AvroIO.Write<T> writeTransform, + AvroIO.Read<T> readTransform + ) throws Exception { + File outputFile = tmpFolder.newFile("output.avro"); + + List<T> values = + ImmutableList.of( + (T) new AvroGeneratedUser("Bob", 256, null), + (T) new AvroGeneratedUser("Alice", 128, null), + (T) new AvroGeneratedUser("Ted", null, "white")); + + writePipeline + .apply(Create.<T>of(values)) + .apply( + writeTransform + .to(writePipeline.newProvider(outputFile.getAbsolutePath())) + .withoutSharding()); + writePipeline.run(); + + PAssert.that( + readPipeline.apply( + "Read", + readTransform + .from(readPipeline.newProvider(outputFile.getAbsolutePath())))) + .containsInAnyOrder(values); + + readPipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testWriteThenReadGeneratedClassWithClass() throws Throwable { + testWriteThenReadGeneratedClass( + AvroIO.write(AvroGeneratedUser.class), AvroIO.read(AvroGeneratedUser.class)); + } + + @Test + @Category(NeedsRunner.class) + public void testWriteThenReadGeneratedClassWithSchema() throws Throwable { + testWriteThenReadGeneratedClass( + AvroIO.writeGenericRecords(SCHEMA), AvroIO.readGenericRecords(SCHEMA)); + } + + @Test + @Category(NeedsRunner.class) + public void testWriteThenReadGeneratedClassWithSchemaString() throws Throwable { + testWriteThenReadGeneratedClass( + AvroIO.writeGenericRecords(SCHEMA.toString()), + AvroIO.readGenericRecords(SCHEMA.toString())); + } + + @Test + @Category(NeedsRunner.class) + public void testWriteSingleFileThenReadUsingAllMethods() throws Throwable { List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); writePipeline.apply(Create.of(values)) .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding()); - writePipeline.run().waitUntilFinish(); + writePipeline.run(); // Test the same data using all versions of read(). PCollection<String> path = @@ -222,32 +348,7 @@ public class AvroIOTest { @Test @Category(NeedsRunner.class) - public void testAvroIOWriteAndReadViaValueProvider() throws Throwable { - List<GenericClass> values = - ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); - File outputFile = tmpFolder.newFile("output.avro"); - - writePipeline - .apply(Create.of(values)) - .apply( - AvroIO.write(GenericClass.class) - .to(writePipeline.newProvider(outputFile.getAbsolutePath())) - .withoutSharding()); - writePipeline.run().waitUntilFinish(); - - PAssert.that( - readPipeline.apply( - "Read", - AvroIO.read(GenericClass.class) - .from(readPipeline.newProvider(outputFile.getAbsolutePath())))) - .containsInAnyOrder(values); - - readPipeline.run(); - } - - @Test - @Category(NeedsRunner.class) - public void testAvroIOWriteAndReadMultipleFilepatterns() throws Throwable { + public void testWriteThenReadMultipleFilepatterns() throws Throwable { List<GenericClass> firstValues = Lists.newArrayList(); List<GenericClass> secondValues = Lists.newArrayList(); for (int i = 0; i < 10; ++i) { @@ -268,7 +369,7 @@ public class AvroIOTest { AvroIO.write(GenericClass.class) .to(tmpFolder.getRoot().getAbsolutePath() + "/second") .withNumShards(3)); - writePipeline.run().waitUntilFinish(); + writePipeline.run(); // Test readAll() and parseAllGenericRecords(). PCollection<String> paths = @@ -301,7 +402,7 @@ public class AvroIOTest { @Test @Category(NeedsRunner.class) - public void testAvroIOContinuouslyWriteAndReadMultipleFilepatterns() throws Throwable { + public void testContinuouslyWriteAndReadMultipleFilepatterns() throws Throwable { SimpleFunction<Long, GenericClass> mapFn = new CreateGenericClass(); List<GenericClass> firstValues = Lists.newArrayList(); List<GenericClass> secondValues = Lists.newArrayList(); @@ -380,14 +481,13 @@ public class AvroIOTest { Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3))) .withDesiredBundleSizeBytes(10))) .containsInAnyOrder(Iterables.concat(firstValues, secondValues)); - readPipeline.run(); } @Test @SuppressWarnings("unchecked") @Category(NeedsRunner.class) - public void testAvroIOCompressedWriteAndReadASingleFile() throws Throwable { + public void testCompressedWriteAndReadASingleFile() throws Throwable { List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); @@ -398,22 +498,23 @@ public class AvroIOTest { .to(outputFile.getAbsolutePath()) .withoutSharding() .withCodec(CodecFactory.deflateCodec(9))); - writePipeline.run().waitUntilFinish(); - - PCollection<GenericClass> input = - readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())); + writePipeline.run(); - PAssert.that(input).containsInAnyOrder(values); + PAssert.that( + readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()))) + .containsInAnyOrder(values); readPipeline.run(); - DataFileStream dataFileStream = - new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader()); - assertEquals("deflate", dataFileStream.getMetaString("avro.codec")); + + try (DataFileStream dataFileStream = + new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader())) { + assertEquals("deflate", dataFileStream.getMetaString("avro.codec")); + } } @Test @SuppressWarnings("unchecked") @Category(NeedsRunner.class) - public void testAvroIONullCodecWriteAndReadASingleFile() throws Throwable { + public void testWriteThenReadASingleFileWithNullCodec() throws Throwable { List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); @@ -424,16 +525,17 @@ public class AvroIOTest { .to(outputFile.getAbsolutePath()) .withoutSharding() .withCodec(CodecFactory.nullCodec())); - writePipeline.run().waitUntilFinish(); - - PCollection<GenericClass> input = - readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())); + writePipeline.run(); - PAssert.that(input).containsInAnyOrder(values); + PAssert.that( + readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()))) + .containsInAnyOrder(values); readPipeline.run(); - DataFileStream dataFileStream = - new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader()); - assertEquals("null", dataFileStream.getMetaString("avro.codec")); + + try (DataFileStream dataFileStream = + new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader())) { + assertEquals("null", dataFileStream.getMetaString("avro.codec")); + } } @DefaultCoder(AvroCoder.class) @@ -485,22 +587,22 @@ public class AvroIOTest { */ @Test @Category(NeedsRunner.class) - public void testAvroIOWriteAndReadSchemaUpgrade() throws Throwable { + public void testWriteThenReadSchemaUpgrade() throws Throwable { List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); writePipeline.apply(Create.of(values)) .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding()); - writePipeline.run().waitUntilFinish(); + writePipeline.run(); List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, "hi", null), new GenericClassV2(5, "bar", null)); - PCollection<GenericClassV2> input = - readPipeline.apply(AvroIO.read(GenericClassV2.class).from(outputFile.getAbsolutePath())); - - PAssert.that(input).containsInAnyOrder(expected); + PAssert.that( + readPipeline.apply( + AvroIO.read(GenericClassV2.class).from(outputFile.getAbsolutePath()))) + .containsInAnyOrder(expected); readPipeline.run(); } @@ -550,11 +652,11 @@ public class AvroIOTest { } } - @Rule public TestPipeline windowedAvroWritePipeline = TestPipeline.create(); + @Rule public transient TestPipeline windowedAvroWritePipeline = TestPipeline.create(); @Test @Category({ValidatesRunner.class, UsesTestStream.class}) - public void testWindowedAvroIOWrite() throws Throwable { + public void testWriteWindowed() throws Throwable { Path baseDir = Files.createTempDirectory(tmpFolder.getRoot().toPath(), "testwrite"); String baseFilename = baseDir.resolve("prefix").toString(); @@ -880,11 +982,12 @@ public class AvroIOTest { "bytesValue".getBytes()))); writePipeline.run(); - DataFileStream dataFileStream = - new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader()); - assertEquals("stringValue", dataFileStream.getMetaString("stringKey")); - assertEquals(100L, dataFileStream.getMetaLong("longKey")); - assertArrayEquals("bytesValue".getBytes(), dataFileStream.getMeta("bytesKey")); + try (DataFileStream dataFileStream = + new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader())) { + assertEquals("stringValue", dataFileStream.getMetaString("stringKey")); + assertEquals(100L, dataFileStream.getMetaLong("longKey")); + assertArrayEquals("bytesValue".getBytes(), dataFileStream.getMeta("bytesKey")); + } } @SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for tests. http://git-wip-us.apache.org/repos/asf/beam/blob/8242abdd/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java deleted file mode 100644 index b4f7a79..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java +++ /dev/null @@ -1,324 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io; - -import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -import com.google.common.collect.ImmutableList; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import javax.annotation.Nullable; -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileReader; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.values.PCollection; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Suite; - -/** - * A test suite for {@link AvroIO.Write} and {@link AvroIO.Read} transforms. - */ -@RunWith(Suite.class) [email protected]({ - AvroIOTransformTest.AvroIOReadTransformTest.class, - AvroIOTransformTest.AvroIOWriteTransformTest.class -}) -public class AvroIOTransformTest { - - // TODO: Stop requiring local files - - @Rule - public final transient TestPipeline pipeline = TestPipeline.create(); - - @Rule - public final TemporaryFolder tmpFolder = new TemporaryFolder(); - - private static final Schema.Parser parser = new Schema.Parser(); - - private static final String SCHEMA_STRING = - "{\"namespace\": \"example.avro\",\n" - + " \"type\": \"record\",\n" - + " \"name\": \"AvroGeneratedUser\",\n" - + " \"fields\": [\n" - + " {\"name\": \"name\", \"type\": \"string\"},\n" - + " {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n" - + " {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n" - + " ]\n" - + "}"; - - private static final Schema SCHEMA = parser.parse(SCHEMA_STRING); - - private static AvroGeneratedUser[] generateAvroObjects() { - final AvroGeneratedUser user1 = new AvroGeneratedUser(); - user1.setName("Bob"); - user1.setFavoriteNumber(256); - - final AvroGeneratedUser user2 = new AvroGeneratedUser(); - user2.setName("Alice"); - user2.setFavoriteNumber(128); - - final AvroGeneratedUser user3 = new AvroGeneratedUser(); - user3.setName("Ted"); - user3.setFavoriteColor("white"); - - return new AvroGeneratedUser[] { user1, user2, user3 }; - } - - /** - * Tests for AvroIO Read transforms, using classes generated from {@code user.avsc}. - */ - @RunWith(Parameterized.class) - public static class AvroIOReadTransformTest extends AvroIOTransformTest { - - private static GenericRecord[] generateAvroGenericRecords() { - final GenericRecord user1 = new GenericData.Record(SCHEMA); - user1.put("name", "Bob"); - user1.put("favorite_number", 256); - - final GenericRecord user2 = new GenericData.Record(SCHEMA); - user2.put("name", "Alice"); - user2.put("favorite_number", 128); - - final GenericRecord user3 = new GenericData.Record(SCHEMA); - user3.put("name", "Ted"); - user3.put("favorite_color", "white"); - - return new GenericRecord[] { user1, user2, user3 }; - } - - private void generateAvroFile(final AvroGeneratedUser[] elements, - final File avroFile) throws IOException { - final DatumWriter<AvroGeneratedUser> userDatumWriter = - new SpecificDatumWriter<>(AvroGeneratedUser.class); - try (DataFileWriter<AvroGeneratedUser> dataFileWriter = - new DataFileWriter<>(userDatumWriter)) { - dataFileWriter.create(elements[0].getSchema(), avroFile); - for (final AvroGeneratedUser user : elements) { - dataFileWriter.append(user); - } - } - } - - private <T> void runTestRead(@Nullable final String applyName, - final AvroIO.Read<T> readBuilder, - final String expectedName, - final T[] expectedOutput) throws Exception { - - final File avroFile = tmpFolder.newFile("file.avro"); - generateAvroFile(generateAvroObjects(), avroFile); - final AvroIO.Read<T> read = readBuilder.from(avroFile.getPath()); - final PCollection<T> output = - applyName == null ? pipeline.apply(read) : pipeline.apply(applyName, read); - - PAssert.that(output).containsInAnyOrder(expectedOutput); - - pipeline.run(); - - assertEquals(expectedName, output.getName()); - } - - @Parameterized.Parameters(name = "{2}_with_{4}") - public static Iterable<Object[]> data() throws IOException { - - final String generatedClass = "GeneratedClass"; - final String fromSchema = "SchemaObject"; - final String fromSchemaString = "SchemaString"; - - return - ImmutableList.<Object[]>builder() - .add( - - // test read using generated class - new Object[] { - null, - AvroIO.read(AvroGeneratedUser.class), - "AvroIO.Read/Read.out", - generateAvroObjects(), - generatedClass - }, - new Object[] { - "MyRead", - AvroIO.read(AvroGeneratedUser.class), - "MyRead/Read.out", - generateAvroObjects(), - generatedClass - }, - - // test read using schema object - new Object[] { - null, - AvroIO.readGenericRecords(SCHEMA), - "AvroIO.Read/Read.out", - generateAvroGenericRecords(), - fromSchema - }, - new Object[] { - "MyRead", - AvroIO.readGenericRecords(SCHEMA), - "MyRead/Read.out", - generateAvroGenericRecords(), - fromSchema - }, - - // test read using schema string - new Object[] { - null, - AvroIO.readGenericRecords(SCHEMA_STRING), - "AvroIO.Read/Read.out", - generateAvroGenericRecords(), - fromSchemaString - }, - new Object[] { - "MyRead", - AvroIO.readGenericRecords(SCHEMA_STRING), - "MyRead/Read.out", - generateAvroGenericRecords(), - fromSchemaString - }) - .build(); - } - - @SuppressWarnings("DefaultAnnotationParam") - @Parameterized.Parameter(0) - public String transformName; - - @Parameterized.Parameter(1) - public AvroIO.Read readTransform; - - @Parameterized.Parameter(2) - public String expectedReadTransformName; - - @Parameterized.Parameter(3) - public Object[] expectedOutput; - - @Parameterized.Parameter(4) - public String testAlias; - - @Test - @Category(NeedsRunner.class) - public void testRead() throws Exception { - runTestRead(transformName, readTransform, expectedReadTransformName, expectedOutput); - } - } - - /** - * Tests for AvroIO Write transforms, using classes generated from {@code user.avsc}. - */ - @RunWith(Parameterized.class) - public static class AvroIOWriteTransformTest extends AvroIOTransformTest { - - private static final String WRITE_TRANSFORM_NAME = "AvroIO.Write"; - - private List<AvroGeneratedUser> readAvroFile(final File avroFile) throws IOException { - final DatumReader<AvroGeneratedUser> userDatumReader = - new SpecificDatumReader<>(AvroGeneratedUser.class); - final List<AvroGeneratedUser> users = new ArrayList<>(); - try (DataFileReader<AvroGeneratedUser> dataFileReader = - new DataFileReader<>(avroFile, userDatumReader)) { - while (dataFileReader.hasNext()) { - users.add(dataFileReader.next()); - } - } - return users; - } - - @Parameterized.Parameters(name = "{0}_with_{1}") - public static Iterable<Object[]> data() throws IOException { - - final String generatedClass = "GeneratedClass"; - final String fromSchema = "SchemaObject"; - final String fromSchemaString = "SchemaString"; - - return - ImmutableList.<Object[]>builder() - .add( - new Object[] { - AvroIO.write(AvroGeneratedUser.class), - generatedClass - }, - new Object[] { - AvroIO.writeGenericRecords(SCHEMA), - fromSchema - }, - - new Object[] { - AvroIO.writeGenericRecords(SCHEMA_STRING), - fromSchemaString - }) - .build(); - } - - @SuppressWarnings("DefaultAnnotationParam") - @Parameterized.Parameter(0) - public AvroIO.Write writeTransform; - - @Parameterized.Parameter(1) - public String testAlias; - - private <T> void runTestWrite(final AvroIO.Write<T> writeBuilder) - throws Exception { - - final File avroFile = tmpFolder.newFile("file.avro"); - final AvroGeneratedUser[] users = generateAvroObjects(); - final AvroIO.Write<T> write = writeBuilder.to(avroFile.getPath()); - - @SuppressWarnings("unchecked") final - PCollection<T> input = - pipeline.apply(Create.of(Arrays.asList((T[]) users)) - .withCoder((Coder<T>) AvroCoder.of(AvroGeneratedUser.class))); - input.apply(write.withoutSharding()); - - pipeline.run(); - - assertEquals(WRITE_TRANSFORM_NAME, write.getName()); - assertThat(readAvroFile(avroFile), containsInAnyOrder(users)); - } - - @Test - @Category(NeedsRunner.class) - public void testWrite() throws Exception { - runTestWrite(writeTransform); - } - - // TODO: for Write only, test withSuffix, withNumShards, - // withShardNameTemplate and withoutSharding. - } -}
