Repository: beam
Updated Branches:
  refs/heads/master 49c392790 -> f7aa41aa1


[BEAM-2682] Deletes AvroIOTransformTest

Instead, merges the little test coverage it provided into AvroIOTest.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8242abdd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8242abdd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8242abdd

Branch: refs/heads/master
Commit: 8242abdd433ea961deb672c5c5d00a21b4777d56
Parents: 49c3927
Author: Eugene Kirpichov <[email protected]>
Authored: Mon Oct 16 14:48:17 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Fri Oct 20 11:49:17 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     |   3 +
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 237 ++++++++++----
 .../apache/beam/sdk/io/AvroIOTransformTest.java | 324 -------------------
 3 files changed, 173 insertions(+), 391 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8242abdd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 1474759..2cc0f52 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -1156,6 +1156,9 @@ public class AvroIO {
             getFormatFunction() == null,
             "A format function should not be specified "
                 + "with DynamicDestinations. Use 
DynamicDestinations.formatRecord instead");
+      } else {
+        checkArgument(
+            getSchema() != null, "Unless using DynamicDestinations, 
.withSchema() is required.");
       }
 
       ValueProvider<ResourceId> tempDirectory = getTempDirectory();

http://git-wip-us.apache.org/repos/asf/beam/blob/8242abdd/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 3976392..239c9f4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -40,6 +40,7 @@ import com.google.common.collect.Multimap;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.Serializable;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -103,17 +104,17 @@ import org.junit.runners.JUnit4;
 
 /** Tests for AvroIO Read and Write transforms. */
 @RunWith(JUnit4.class)
-public class AvroIOTest {
+public class AvroIOTest implements Serializable {
 
   @Rule
-  public TestPipeline writePipeline = TestPipeline.create();
+  public transient TestPipeline writePipeline = TestPipeline.create();
 
   @Rule
-  public TestPipeline readPipeline = TestPipeline.create();
+  public transient TestPipeline readPipeline = TestPipeline.create();
 
-  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
 
-  @Rule public ExpectedException expectedException = ExpectedException.none();
+  @Rule public transient ExpectedException expectedException = 
ExpectedException.none();
 
   @Test
   public void testAvroIOGetName() {
@@ -165,16 +166,141 @@ public class AvroIOTest {
     }
   }
 
+  private static final String SCHEMA_STRING =
+      "{\"namespace\": \"example.avro\",\n"
+          + " \"type\": \"record\",\n"
+          + " \"name\": \"AvroGeneratedUser\",\n"
+          + " \"fields\": [\n"
+          + "     {\"name\": \"name\", \"type\": \"string\"},\n"
+          + "     {\"name\": \"favorite_number\", \"type\": [\"int\", 
\"null\"]},\n"
+          + "     {\"name\": \"favorite_color\", \"type\": [\"string\", 
\"null\"]}\n"
+          + " ]\n"
+          + "}";
+
+  private static final Schema SCHEMA = new 
Schema.Parser().parse(SCHEMA_STRING);
+
   @Test
   @Category(NeedsRunner.class)
-  public void testAvroIOWriteAndReadAndParseASingleFile() throws Throwable {
+  public void testWriteThenReadJavaClass() throws Throwable {
+    List<GenericClass> values =
+        ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
+    File outputFile = tmpFolder.newFile("output.avro");
+
+    writePipeline
+        .apply(Create.of(values))
+        .apply(
+            AvroIO.write(GenericClass.class)
+                .to(writePipeline.newProvider(outputFile.getAbsolutePath()))
+                .withoutSharding());
+    writePipeline.run();
+
+    PAssert.that(
+            readPipeline.apply(
+                "Read",
+                AvroIO.read(GenericClass.class)
+                    
.from(readPipeline.newProvider(outputFile.getAbsolutePath()))))
+        .containsInAnyOrder(values);
+
+    readPipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteThenReadCustomType() throws Throwable {
+    List<Long> values = Arrays.asList(0L, 1L, 2L);
+    File outputFile = tmpFolder.newFile("output.avro");
+
+    writePipeline
+        .apply(Create.of(values))
+        .apply(
+            AvroIO.<Long, GenericClass>writeCustomType()
+                .to(writePipeline.newProvider(outputFile.getAbsolutePath()))
+                .withFormatFunction(new CreateGenericClass())
+                .withSchema(ReflectData.get().getSchema(GenericClass.class))
+                .withoutSharding());
+    writePipeline.run();
+
+    PAssert.that(
+            readPipeline
+                .apply(
+                    "Read",
+                    AvroIO.read(GenericClass.class)
+                        
.from(readPipeline.newProvider(outputFile.getAbsolutePath())))
+                .apply(
+                    MapElements.via(
+                        new SimpleFunction<GenericClass, Long>() {
+                          @Override
+                          public Long apply(GenericClass input) {
+                            return (long) input.intField;
+                          }
+                        })))
+        .containsInAnyOrder(values);
+
+    readPipeline.run();
+  }
+
+  private <T extends GenericRecord> void testWriteThenReadGeneratedClass(
+      AvroIO.Write<T> writeTransform,
+      AvroIO.Read<T> readTransform
+  ) throws Exception {
+    File outputFile = tmpFolder.newFile("output.avro");
+
+    List<T> values =
+        ImmutableList.of(
+            (T) new AvroGeneratedUser("Bob", 256, null),
+            (T) new AvroGeneratedUser("Alice", 128, null),
+            (T) new AvroGeneratedUser("Ted", null, "white"));
+
+    writePipeline
+        .apply(Create.<T>of(values))
+        .apply(
+            writeTransform
+                .to(writePipeline.newProvider(outputFile.getAbsolutePath()))
+                .withoutSharding());
+    writePipeline.run();
+
+    PAssert.that(
+        readPipeline.apply(
+            "Read",
+            readTransform
+                .from(readPipeline.newProvider(outputFile.getAbsolutePath()))))
+        .containsInAnyOrder(values);
+
+    readPipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteThenReadGeneratedClassWithClass() throws Throwable {
+    testWriteThenReadGeneratedClass(
+        AvroIO.write(AvroGeneratedUser.class), 
AvroIO.read(AvroGeneratedUser.class));
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteThenReadGeneratedClassWithSchema() throws Throwable {
+    testWriteThenReadGeneratedClass(
+        AvroIO.writeGenericRecords(SCHEMA), AvroIO.readGenericRecords(SCHEMA));
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteThenReadGeneratedClassWithSchemaString() throws 
Throwable {
+    testWriteThenReadGeneratedClass(
+        AvroIO.writeGenericRecords(SCHEMA.toString()),
+        AvroIO.readGenericRecords(SCHEMA.toString()));
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteSingleFileThenReadUsingAllMethods() throws Throwable {
     List<GenericClass> values =
         ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
     File outputFile = tmpFolder.newFile("output.avro");
 
     writePipeline.apply(Create.of(values))
         
.apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
-    writePipeline.run().waitUntilFinish();
+    writePipeline.run();
 
     // Test the same data using all versions of read().
     PCollection<String> path =
@@ -222,32 +348,7 @@ public class AvroIOTest {
 
   @Test
   @Category(NeedsRunner.class)
-  public void testAvroIOWriteAndReadViaValueProvider() throws Throwable {
-    List<GenericClass> values =
-        ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
-    File outputFile = tmpFolder.newFile("output.avro");
-
-    writePipeline
-        .apply(Create.of(values))
-        .apply(
-            AvroIO.write(GenericClass.class)
-                .to(writePipeline.newProvider(outputFile.getAbsolutePath()))
-                .withoutSharding());
-    writePipeline.run().waitUntilFinish();
-
-    PAssert.that(
-            readPipeline.apply(
-                "Read",
-                AvroIO.read(GenericClass.class)
-                    
.from(readPipeline.newProvider(outputFile.getAbsolutePath()))))
-        .containsInAnyOrder(values);
-
-    readPipeline.run();
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testAvroIOWriteAndReadMultipleFilepatterns() throws Throwable {
+  public void testWriteThenReadMultipleFilepatterns() throws Throwable {
     List<GenericClass> firstValues = Lists.newArrayList();
     List<GenericClass> secondValues = Lists.newArrayList();
     for (int i = 0; i < 10; ++i) {
@@ -268,7 +369,7 @@ public class AvroIOTest {
             AvroIO.write(GenericClass.class)
                 .to(tmpFolder.getRoot().getAbsolutePath() + "/second")
                 .withNumShards(3));
-    writePipeline.run().waitUntilFinish();
+    writePipeline.run();
 
     // Test readAll() and parseAllGenericRecords().
     PCollection<String> paths =
@@ -301,7 +402,7 @@ public class AvroIOTest {
 
   @Test
   @Category(NeedsRunner.class)
-  public void testAvroIOContinuouslyWriteAndReadMultipleFilepatterns() throws 
Throwable {
+  public void testContinuouslyWriteAndReadMultipleFilepatterns() throws 
Throwable {
     SimpleFunction<Long, GenericClass> mapFn = new CreateGenericClass();
     List<GenericClass> firstValues = Lists.newArrayList();
     List<GenericClass> secondValues = Lists.newArrayList();
@@ -380,14 +481,13 @@ public class AvroIOTest {
                         
Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3)))
                     .withDesiredBundleSizeBytes(10)))
         .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
-
     readPipeline.run();
   }
 
   @Test
   @SuppressWarnings("unchecked")
   @Category(NeedsRunner.class)
-  public void testAvroIOCompressedWriteAndReadASingleFile() throws Throwable {
+  public void testCompressedWriteAndReadASingleFile() throws Throwable {
     List<GenericClass> values =
         ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
     File outputFile = tmpFolder.newFile("output.avro");
@@ -398,22 +498,23 @@ public class AvroIOTest {
                 .to(outputFile.getAbsolutePath())
                 .withoutSharding()
                 .withCodec(CodecFactory.deflateCodec(9)));
-    writePipeline.run().waitUntilFinish();
-
-    PCollection<GenericClass> input =
-        
readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
+    writePipeline.run();
 
-    PAssert.that(input).containsInAnyOrder(values);
+    PAssert.that(
+            
readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
+        .containsInAnyOrder(values);
     readPipeline.run();
-    DataFileStream dataFileStream =
-        new DataFileStream(new FileInputStream(outputFile), new 
GenericDatumReader());
-    assertEquals("deflate", dataFileStream.getMetaString("avro.codec"));
+
+    try (DataFileStream dataFileStream =
+        new DataFileStream(new FileInputStream(outputFile), new 
GenericDatumReader())) {
+      assertEquals("deflate", dataFileStream.getMetaString("avro.codec"));
+    }
   }
 
   @Test
   @SuppressWarnings("unchecked")
   @Category(NeedsRunner.class)
-  public void testAvroIONullCodecWriteAndReadASingleFile() throws Throwable {
+  public void testWriteThenReadASingleFileWithNullCodec() throws Throwable {
     List<GenericClass> values =
         ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
     File outputFile = tmpFolder.newFile("output.avro");
@@ -424,16 +525,17 @@ public class AvroIOTest {
                 .to(outputFile.getAbsolutePath())
                 .withoutSharding()
                 .withCodec(CodecFactory.nullCodec()));
-    writePipeline.run().waitUntilFinish();
-
-    PCollection<GenericClass> input =
-        
readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
+    writePipeline.run();
 
-    PAssert.that(input).containsInAnyOrder(values);
+    PAssert.that(
+            
readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
+        .containsInAnyOrder(values);
     readPipeline.run();
-    DataFileStream dataFileStream =
-        new DataFileStream(new FileInputStream(outputFile), new 
GenericDatumReader());
-    assertEquals("null", dataFileStream.getMetaString("avro.codec"));
+
+    try (DataFileStream dataFileStream =
+        new DataFileStream(new FileInputStream(outputFile), new 
GenericDatumReader())) {
+      assertEquals("null", dataFileStream.getMetaString("avro.codec"));
+    }
   }
 
   @DefaultCoder(AvroCoder.class)
@@ -485,22 +587,22 @@ public class AvroIOTest {
    */
   @Test
   @Category(NeedsRunner.class)
-  public void testAvroIOWriteAndReadSchemaUpgrade() throws Throwable {
+  public void testWriteThenReadSchemaUpgrade() throws Throwable {
     List<GenericClass> values =
         ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
     File outputFile = tmpFolder.newFile("output.avro");
 
     writePipeline.apply(Create.of(values))
         
.apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
-    writePipeline.run().waitUntilFinish();
+    writePipeline.run();
 
     List<GenericClassV2> expected =
         ImmutableList.of(new GenericClassV2(3, "hi", null), new 
GenericClassV2(5, "bar", null));
 
-    PCollection<GenericClassV2> input =
-        
readPipeline.apply(AvroIO.read(GenericClassV2.class).from(outputFile.getAbsolutePath()));
-
-    PAssert.that(input).containsInAnyOrder(expected);
+    PAssert.that(
+            readPipeline.apply(
+                
AvroIO.read(GenericClassV2.class).from(outputFile.getAbsolutePath())))
+        .containsInAnyOrder(expected);
     readPipeline.run();
   }
 
@@ -550,11 +652,11 @@ public class AvroIOTest {
     }
   }
 
-  @Rule public TestPipeline windowedAvroWritePipeline = TestPipeline.create();
+  @Rule public transient TestPipeline windowedAvroWritePipeline = 
TestPipeline.create();
 
   @Test
   @Category({ValidatesRunner.class, UsesTestStream.class})
-  public void testWindowedAvroIOWrite() throws Throwable {
+  public void testWriteWindowed() throws Throwable {
     Path baseDir = Files.createTempDirectory(tmpFolder.getRoot().toPath(), 
"testwrite");
     String baseFilename = baseDir.resolve("prefix").toString();
 
@@ -880,11 +982,12 @@ public class AvroIOTest {
                         "bytesValue".getBytes())));
     writePipeline.run();
 
-    DataFileStream dataFileStream =
-        new DataFileStream(new FileInputStream(outputFile), new 
GenericDatumReader());
-    assertEquals("stringValue", dataFileStream.getMetaString("stringKey"));
-    assertEquals(100L, dataFileStream.getMetaLong("longKey"));
-    assertArrayEquals("bytesValue".getBytes(), 
dataFileStream.getMeta("bytesKey"));
+    try (DataFileStream dataFileStream =
+        new DataFileStream(new FileInputStream(outputFile), new 
GenericDatumReader())) {
+      assertEquals("stringValue", dataFileStream.getMetaString("stringKey"));
+      assertEquals(100L, dataFileStream.getMetaLong("longKey"));
+      assertArrayEquals("bytesValue".getBytes(), 
dataFileStream.getMeta("bytesKey"));
+    }
   }
 
   @SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for 
tests.

http://git-wip-us.apache.org/repos/asf/beam/blob/8242abdd/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
deleted file mode 100644
index b4f7a79..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io;
-
-import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.ImmutableList;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Suite;
-
-/**
- * A test suite for {@link AvroIO.Write} and {@link AvroIO.Read} transforms.
- */
-@RunWith(Suite.class)
[email protected]({
-    AvroIOTransformTest.AvroIOReadTransformTest.class,
-    AvroIOTransformTest.AvroIOWriteTransformTest.class
-})
-public class AvroIOTransformTest {
-
-  // TODO: Stop requiring local files
-
-  @Rule
-  public final transient TestPipeline pipeline = TestPipeline.create();
-
-  @Rule
-  public final TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  private static final Schema.Parser parser = new Schema.Parser();
-
-  private static final String SCHEMA_STRING =
-      "{\"namespace\": \"example.avro\",\n"
-          + " \"type\": \"record\",\n"
-          + " \"name\": \"AvroGeneratedUser\",\n"
-          + " \"fields\": [\n"
-          + "     {\"name\": \"name\", \"type\": \"string\"},\n"
-          + "     {\"name\": \"favorite_number\", \"type\": [\"int\", 
\"null\"]},\n"
-          + "     {\"name\": \"favorite_color\", \"type\": [\"string\", 
\"null\"]}\n"
-          + " ]\n"
-          + "}";
-
-  private static final Schema SCHEMA = parser.parse(SCHEMA_STRING);
-
-  private static AvroGeneratedUser[] generateAvroObjects() {
-    final AvroGeneratedUser user1 = new AvroGeneratedUser();
-    user1.setName("Bob");
-    user1.setFavoriteNumber(256);
-
-    final AvroGeneratedUser user2 = new AvroGeneratedUser();
-    user2.setName("Alice");
-    user2.setFavoriteNumber(128);
-
-    final AvroGeneratedUser user3 = new AvroGeneratedUser();
-    user3.setName("Ted");
-    user3.setFavoriteColor("white");
-
-    return new AvroGeneratedUser[] { user1, user2, user3 };
-  }
-
-  /**
-   * Tests for AvroIO Read transforms, using classes generated from {@code 
user.avsc}.
-   */
-  @RunWith(Parameterized.class)
-  public static class AvroIOReadTransformTest extends AvroIOTransformTest {
-
-    private static GenericRecord[] generateAvroGenericRecords() {
-      final GenericRecord user1 = new GenericData.Record(SCHEMA);
-      user1.put("name", "Bob");
-      user1.put("favorite_number", 256);
-
-      final GenericRecord user2 = new GenericData.Record(SCHEMA);
-      user2.put("name", "Alice");
-      user2.put("favorite_number", 128);
-
-      final GenericRecord user3 = new GenericData.Record(SCHEMA);
-      user3.put("name", "Ted");
-      user3.put("favorite_color", "white");
-
-      return new GenericRecord[] { user1, user2, user3 };
-    }
-
-    private void generateAvroFile(final AvroGeneratedUser[] elements,
-                                  final File avroFile) throws IOException {
-      final DatumWriter<AvroGeneratedUser> userDatumWriter =
-          new SpecificDatumWriter<>(AvroGeneratedUser.class);
-      try (DataFileWriter<AvroGeneratedUser> dataFileWriter =
-          new DataFileWriter<>(userDatumWriter)) {
-        dataFileWriter.create(elements[0].getSchema(), avroFile);
-        for (final AvroGeneratedUser user : elements) {
-          dataFileWriter.append(user);
-        }
-      }
-    }
-
-    private <T> void runTestRead(@Nullable final String applyName,
-                                 final AvroIO.Read<T> readBuilder,
-                                 final String expectedName,
-                                 final T[] expectedOutput) throws Exception {
-
-      final File avroFile = tmpFolder.newFile("file.avro");
-      generateAvroFile(generateAvroObjects(), avroFile);
-      final AvroIO.Read<T> read = readBuilder.from(avroFile.getPath());
-      final PCollection<T> output =
-          applyName == null ? pipeline.apply(read) : pipeline.apply(applyName, 
read);
-
-      PAssert.that(output).containsInAnyOrder(expectedOutput);
-
-      pipeline.run();
-
-      assertEquals(expectedName, output.getName());
-    }
-
-    @Parameterized.Parameters(name = "{2}_with_{4}")
-    public static Iterable<Object[]> data() throws IOException {
-
-      final String generatedClass = "GeneratedClass";
-      final String fromSchema = "SchemaObject";
-      final String fromSchemaString = "SchemaString";
-
-      return
-          ImmutableList.<Object[]>builder()
-              .add(
-
-                  // test read using generated class
-                  new Object[] {
-                      null,
-                      AvroIO.read(AvroGeneratedUser.class),
-                      "AvroIO.Read/Read.out",
-                      generateAvroObjects(),
-                      generatedClass
-                  },
-                  new Object[] {
-                      "MyRead",
-                      AvroIO.read(AvroGeneratedUser.class),
-                      "MyRead/Read.out",
-                      generateAvroObjects(),
-                      generatedClass
-                  },
-
-                  // test read using schema object
-                  new Object[] {
-                      null,
-                      AvroIO.readGenericRecords(SCHEMA),
-                      "AvroIO.Read/Read.out",
-                      generateAvroGenericRecords(),
-                      fromSchema
-                  },
-                  new Object[] {
-                      "MyRead",
-                      AvroIO.readGenericRecords(SCHEMA),
-                      "MyRead/Read.out",
-                      generateAvroGenericRecords(),
-                      fromSchema
-                  },
-
-                  // test read using schema string
-                  new Object[] {
-                      null,
-                      AvroIO.readGenericRecords(SCHEMA_STRING),
-                      "AvroIO.Read/Read.out",
-                      generateAvroGenericRecords(),
-                      fromSchemaString
-                  },
-                  new Object[] {
-                      "MyRead",
-                      AvroIO.readGenericRecords(SCHEMA_STRING),
-                      "MyRead/Read.out",
-                      generateAvroGenericRecords(),
-                      fromSchemaString
-                  })
-              .build();
-    }
-
-    @SuppressWarnings("DefaultAnnotationParam")
-    @Parameterized.Parameter(0)
-    public String transformName;
-
-    @Parameterized.Parameter(1)
-    public AvroIO.Read readTransform;
-
-    @Parameterized.Parameter(2)
-    public String expectedReadTransformName;
-
-    @Parameterized.Parameter(3)
-    public Object[] expectedOutput;
-
-    @Parameterized.Parameter(4)
-    public String testAlias;
-
-    @Test
-    @Category(NeedsRunner.class)
-    public void testRead() throws Exception {
-      runTestRead(transformName, readTransform, expectedReadTransformName, 
expectedOutput);
-    }
-  }
-
-  /**
-   * Tests for AvroIO Write transforms, using classes generated from {@code 
user.avsc}.
-   */
-  @RunWith(Parameterized.class)
-  public static class AvroIOWriteTransformTest extends AvroIOTransformTest {
-
-    private static final String WRITE_TRANSFORM_NAME = "AvroIO.Write";
-
-    private List<AvroGeneratedUser> readAvroFile(final File avroFile) throws 
IOException {
-      final DatumReader<AvroGeneratedUser> userDatumReader =
-          new SpecificDatumReader<>(AvroGeneratedUser.class);
-      final List<AvroGeneratedUser> users = new ArrayList<>();
-      try (DataFileReader<AvroGeneratedUser> dataFileReader =
-          new DataFileReader<>(avroFile, userDatumReader)) {
-        while (dataFileReader.hasNext()) {
-          users.add(dataFileReader.next());
-        }
-      }
-      return users;
-    }
-
-    @Parameterized.Parameters(name = "{0}_with_{1}")
-    public static Iterable<Object[]> data() throws IOException {
-
-      final String generatedClass = "GeneratedClass";
-      final String fromSchema = "SchemaObject";
-      final String fromSchemaString = "SchemaString";
-
-      return
-          ImmutableList.<Object[]>builder()
-              .add(
-                  new Object[] {
-                      AvroIO.write(AvroGeneratedUser.class),
-                      generatedClass
-                  },
-                  new Object[] {
-                      AvroIO.writeGenericRecords(SCHEMA),
-                      fromSchema
-                  },
-
-                  new Object[] {
-                      AvroIO.writeGenericRecords(SCHEMA_STRING),
-                      fromSchemaString
-                  })
-              .build();
-    }
-
-    @SuppressWarnings("DefaultAnnotationParam")
-    @Parameterized.Parameter(0)
-    public AvroIO.Write writeTransform;
-
-    @Parameterized.Parameter(1)
-    public String testAlias;
-
-    private <T> void runTestWrite(final AvroIO.Write<T> writeBuilder)
-        throws Exception {
-
-      final File avroFile = tmpFolder.newFile("file.avro");
-      final AvroGeneratedUser[] users = generateAvroObjects();
-      final AvroIO.Write<T> write = writeBuilder.to(avroFile.getPath());
-
-      @SuppressWarnings("unchecked") final
-      PCollection<T> input =
-          pipeline.apply(Create.of(Arrays.asList((T[]) users))
-                               .withCoder((Coder<T>) 
AvroCoder.of(AvroGeneratedUser.class)));
-      input.apply(write.withoutSharding());
-
-      pipeline.run();
-
-      assertEquals(WRITE_TRANSFORM_NAME, write.getName());
-      assertThat(readAvroFile(avroFile), containsInAnyOrder(users));
-    }
-
-    @Test
-    @Category(NeedsRunner.class)
-    public void testWrite() throws Exception {
-      runTestWrite(writeTransform);
-    }
-
-    // TODO: for Write only, test withSuffix, withNumShards,
-    // withShardNameTemplate and withoutSharding.
-  }
-}

Reply via email to