Moves AvroIO.write().withSchema into write()

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/27d74622
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/27d74622
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/27d74622

Branch: refs/heads/master
Commit: 27d74622e877d017aa70feef0ee4cd26a4bece7a
Parents: e0d7475
Author: Eugene Kirpichov <[email protected]>
Authored: Fri Apr 28 19:25:45 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Mon May 1 18:43:38 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 49 +++++++++-----------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 42 +++++++----------
 .../apache/beam/sdk/io/AvroIOTransformTest.java |  2 +-
 3 files changed, 40 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/27d74622/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 8cdd4e7..6b66a98 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -89,26 +89,23 @@ import org.apache.beam.sdk.values.PDone;
  * {@link FileBasedSink.FilenamePolicy} must be set, and unique windows and 
triggers must produce
  * unique filenames.
  *
- * <p>It is required to specify {@link AvroIO.Write#withSchema}. To
- * write specific records, such as Avro-generated classes, provide an
- * Avro-generated class type. To write {@link GenericRecord GenericRecords}, 
provide either
- * a {@link Schema} object or a schema in a JSON-encoded string form.
- * An exception will be thrown if a record doesn't match the specified
- * schema.
+ * <p>To write specific records, such as Avro-generated classes, use {@link 
#write(Class)}.
+ * To write {@link GenericRecord GenericRecords}, use either {@link 
#writeGenericRecords(Schema)}
+ * which takes a {@link Schema} object, or {@link 
#writeGenericRecords(String)} which takes a schema
+ * in a JSON-encoded string form. An exception will be thrown if a record 
doesn't match the
+ * specified schema.
  *
  * <p>For example:
  * <pre> {@code
  * // A simple Write to a local file (only runs locally):
  * PCollection<AvroAutoGenClass> records = ...;
- * records.apply(AvroIO.write().to("/path/to/file.avro")
- *                           .withSchema(AvroAutoGenClass.class));
+ * 
records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro"));
  *
  * // A Write to a sharded GCS file (runs locally and using remote execution):
  * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
  * PCollection<GenericRecord> records = ...;
- * records.apply("WriteToAvro", AvroIO.write()
+ * records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema)
  *     .to("gs://my_bucket/path/to/numbers")
- *     .withSchema(schema)
  *     .withSuffix(".avro"));
  * } </pre>
  *
@@ -153,26 +150,31 @@ public class AvroIO {
    * Writes a {@link PCollection} to an Avro file (or multiple Avro files 
matching a sharding
    * pattern).
    */
-  public static <T> Write<T> write() {
-    return new AutoValue_AvroIO_Write.Builder<T>()
-        .setFilenameSuffix("")
-        .setNumShards(0)
-        .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
-        .setCodec(Write.DEFAULT_CODEC)
-        .setMetadata(ImmutableMap.<String, Object>of())
-        .setWindowedWrites(false)
+  public static <T> Write<T> write(Class<T> recordClass) {
+    return AvroIO.<T>defaultWriteBuilder()
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
         .build();
   }
 
   /** Writes Avro records of the specified schema. */
   public static Write<GenericRecord> writeGenericRecords(Schema schema) {
-    return AvroIO.<GenericRecord>write()
-        .toBuilder()
+    return AvroIO.<GenericRecord>defaultWriteBuilder()
         .setRecordClass(GenericRecord.class)
         .setSchema(schema)
         .build();
   }
 
+  private static <T> Write.Builder<T> defaultWriteBuilder() {
+    return new AutoValue_AvroIO_Write.Builder<T>()
+        .setFilenameSuffix("")
+        .setNumShards(0)
+        .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
+        .setCodec(Write.DEFAULT_CODEC)
+        .setMetadata(ImmutableMap.<String, Object>of())
+        .setWindowedWrites(false);
+  }
+
   /**
    * Like {@link #writeGenericRecords(Schema)} but the schema is specified as 
a JSON-encoded string.
    */
@@ -369,13 +371,6 @@ public class AvroIO {
       return toBuilder().setWindowedWrites(true).build();
     }
 
-    /**
-     * Writes to Avro file(s) containing records whose type is the specified 
Avro-generated class.
-     */
-    public Write<T> withSchema(Class<T> type) {
-      return 
toBuilder().setRecordClass(type).setSchema(ReflectData.get().getSchema(type)).build();
-    }
-
     /** Writes to Avro file(s) compressed using specified codec. */
     public Write<T> withCodec(CodecFactory codec) {
       return toBuilder().setCodec(new 
SerializableAvroCodecFactory(codec)).build();

http://git-wip-us.apache.org/repos/asf/beam/blob/27d74622/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 4abd3e0..e421b96 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -45,7 +45,6 @@ import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.Nullable;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
@@ -103,7 +102,7 @@ public class AvroIOTest {
   @Test
   public void testAvroIOGetName() {
     assertEquals("AvroIO.Read", 
AvroIO.read(String.class).from("gs://bucket/foo*/baz").getName());
-    assertEquals("AvroIO.Write", 
AvroIO.write().to("gs://bucket/foo/baz").getName());
+    assertEquals("AvroIO.Write", 
AvroIO.write(String.class).to("gs://bucket/foo/baz").getName());
   }
 
   @DefaultCoder(AvroCoder.class)
@@ -144,9 +143,8 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-      .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
-          .withoutSharding()
-          .withSchema(GenericClass.class));
+      .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
+          .withoutSharding());
     p.run();
 
     PCollection<GenericClass> input =
@@ -167,10 +165,9 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-        .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
+        
.apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
             .withoutSharding()
-            .withCodec(CodecFactory.deflateCodec(9))
-            .withSchema(GenericClass.class));
+            .withCodec(CodecFactory.deflateCodec(9)));
     p.run();
 
     PCollection<GenericClass> input = p
@@ -193,9 +190,8 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-        .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
+        
.apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
             .withoutSharding()
-            .withSchema(GenericClass.class)
             .withCodec(CodecFactory.nullCodec()));
     p.run();
 
@@ -260,9 +256,8 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-      .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
-          .withoutSharding()
-          .withSchema(GenericClass.class));
+      .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
+          .withoutSharding());
     p.run();
 
     List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, 
"hi", null),
@@ -367,10 +362,9 @@ public class AvroIOTest {
     windowedAvroWritePipeline
         .apply(values)
         
.apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1))))
-        .apply(AvroIO.<GenericClass>write().to(new 
WindowedFilenamePolicy(outputFilePrefix))
+        .apply(AvroIO.write(GenericClass.class).to(new 
WindowedFilenamePolicy(outputFilePrefix))
             .withWindowedWrites()
-            .withNumShards(2)
-            .withSchema(GenericClass.class));
+            .withNumShards(2));
     windowedAvroWritePipeline.run();
 
     // Validate that the data written matches the expected elements in the 
expected order
@@ -402,14 +396,14 @@ public class AvroIOTest {
 
   @Test
   public void testWriteWithDefaultCodec() throws Exception {
-    AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
+    AvroIO.Write<String> write = AvroIO.write(String.class)
         .to("gs://bucket/foo/baz");
     assertEquals(CodecFactory.deflateCodec(6).toString(), 
write.getCodec().toString());
   }
 
   @Test
   public void testWriteWithCustomCodec() throws Exception {
-    AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
+    AvroIO.Write<?> write = AvroIO.write(String.class)
         .to("gs://bucket/foo/baz")
         .withCodec(CodecFactory.snappyCodec());
     assertEquals(SNAPPY_CODEC, write.getCodec().toString());
@@ -418,7 +412,7 @@ public class AvroIOTest {
   @Test
   @SuppressWarnings("unchecked")
   public void testWriteWithSerDeCustomDeflateCodec() throws Exception {
-    AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
+    AvroIO.Write<String> write = AvroIO.write(String.class)
         .to("gs://bucket/foo/baz")
         .withCodec(CodecFactory.deflateCodec(9));
 
@@ -430,7 +424,7 @@ public class AvroIOTest {
   @Test
   @SuppressWarnings("unchecked")
   public void testWriteWithSerDeCustomXZCodec() throws Exception {
-    AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
+    AvroIO.Write<String> write = AvroIO.write(String.class)
         .to("gs://bucket/foo/baz")
         .withCodec(CodecFactory.xzCodec(9));
 
@@ -448,9 +442,8 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-        .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
+        
.apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
             .withoutSharding()
-            .withSchema(GenericClass.class)
             .withMetadata(ImmutableMap.<String, Object>of(
                 "stringKey", "stringValue",
                 "longKey", 100L,
@@ -471,7 +464,7 @@ public class AvroIOTest {
     String outputFilePrefix = baseOutputFile.getAbsolutePath();
 
     AvroIO.Write<String> write =
-        AvroIO.<String>write().to(outputFilePrefix).withSchema(String.class);
+        AvroIO.write(String.class).to(outputFilePrefix);
     if (numShards > 1) {
       System.out.println("NumShards " + numShards);
       write = write.withNumShards(numShards);
@@ -552,11 +545,10 @@ public class AvroIOTest {
 
   @Test
   public void testWriteDisplayData() {
-    AvroIO.Write<?> write = AvroIO.<GenericClass>write()
+    AvroIO.Write<?> write = AvroIO.write(GenericClass.class)
         .to("foo")
         .withShardNameTemplate("-SS-of-NN-")
         .withSuffix("bar")
-        .withSchema(GenericClass.class)
         .withNumShards(100)
         .withCodec(CodecFactory.snappyCodec());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/27d74622/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
index fb57d5c..b4f7a79 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -271,7 +271,7 @@ public class AvroIOTransformTest {
           ImmutableList.<Object[]>builder()
               .add(
                   new Object[] {
-                      
AvroIO.<AvroGeneratedUser>write().withSchema(AvroGeneratedUser.class),
+                      AvroIO.write(AvroGeneratedUser.class),
                       generatedClass
                   },
                   new Object[] {

Reply via email to