Moves AvroIO.write().withSchema into write()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/27d74622 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/27d74622 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/27d74622 Branch: refs/heads/master Commit: 27d74622e877d017aa70feef0ee4cd26a4bece7a Parents: e0d7475 Author: Eugene Kirpichov <[email protected]> Authored: Fri Apr 28 19:25:45 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Mon May 1 18:43:38 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroIO.java | 49 +++++++++----------- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 42 +++++++---------- .../apache/beam/sdk/io/AvroIOTransformTest.java | 2 +- 3 files changed, 40 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/27d74622/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 8cdd4e7..6b66a98 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -89,26 +89,23 @@ import org.apache.beam.sdk.values.PDone; * {@link FileBasedSink.FilenamePolicy} must be set, and unique windows and triggers must produce * unique filenames. * - * <p>It is required to specify {@link AvroIO.Write#withSchema}. To - * write specific records, such as Avro-generated classes, provide an - * Avro-generated class type. To write {@link GenericRecord GenericRecords}, provide either - * a {@link Schema} object or a schema in a JSON-encoded string form. - * An exception will be thrown if a record doesn't match the specified - * schema. + * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}. + * To write {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} + * which takes a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema + * in a JSON-encoded string form. An exception will be thrown if a record doesn't match the + * specified schema. * * <p>For example: * <pre> {@code * // A simple Write to a local file (only runs locally): * PCollection<AvroAutoGenClass> records = ...; - * records.apply(AvroIO.write().to("/path/to/file.avro") - * .withSchema(AvroAutoGenClass.class)); + * records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro")); * * // A Write to a sharded GCS file (runs locally and using remote execution): * Schema schema = new Schema.Parser().parse(new File("schema.avsc")); * PCollection<GenericRecord> records = ...; - * records.apply("WriteToAvro", AvroIO.write() + * records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema) * .to("gs://my_bucket/path/to/numbers") - * .withSchema(schema) * .withSuffix(".avro")); * } </pre> * @@ -153,26 +150,31 @@ public class AvroIO { * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding * pattern). */ - public static <T> Write<T> write() { - return new AutoValue_AvroIO_Write.Builder<T>() - .setFilenameSuffix("") - .setNumShards(0) - .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE) - .setCodec(Write.DEFAULT_CODEC) - .setMetadata(ImmutableMap.<String, Object>of()) - .setWindowedWrites(false) + public static <T> Write<T> write(Class<T> recordClass) { + return AvroIO.<T>defaultWriteBuilder() + .setRecordClass(recordClass) + .setSchema(ReflectData.get().getSchema(recordClass)) .build(); } /** Writes Avro records of the specified schema. */ public static Write<GenericRecord> writeGenericRecords(Schema schema) { - return AvroIO.<GenericRecord>write() - .toBuilder() + return AvroIO.<GenericRecord>defaultWriteBuilder() .setRecordClass(GenericRecord.class) .setSchema(schema) .build(); } + private static <T> Write.Builder<T> defaultWriteBuilder() { + return new AutoValue_AvroIO_Write.Builder<T>() + .setFilenameSuffix("") + .setNumShards(0) + .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE) + .setCodec(Write.DEFAULT_CODEC) + .setMetadata(ImmutableMap.<String, Object>of()) + .setWindowedWrites(false); + } + /** * Like {@link #writeGenericRecords(Schema)} but the schema is specified as a JSON-encoded string. */ @@ -369,13 +371,6 @@ public class AvroIO { return toBuilder().setWindowedWrites(true).build(); } - /** - * Writes to Avro file(s) containing records whose type is the specified Avro-generated class. - */ - public Write<T> withSchema(Class<T> type) { - return toBuilder().setRecordClass(type).setSchema(ReflectData.get().getSchema(type)).build(); - } - /** Writes to Avro file(s) compressed using specified codec. */ public Write<T> withCodec(CodecFactory codec) { return toBuilder().setCodec(new SerializableAvroCodecFactory(codec)).build(); http://git-wip-us.apache.org/repos/asf/beam/blob/27d74622/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 4abd3e0..e421b96 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -45,7 +45,6 @@ import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.Nullable; import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; @@ -103,7 +102,7 @@ public class AvroIOTest { @Test public void testAvroIOGetName() { assertEquals("AvroIO.Read", AvroIO.read(String.class).from("gs://bucket/foo*/baz").getName()); - assertEquals("AvroIO.Write", AvroIO.write().to("gs://bucket/foo/baz").getName()); + assertEquals("AvroIO.Write", AvroIO.write(String.class).to("gs://bucket/foo/baz").getName()); } @DefaultCoder(AvroCoder.class) @@ -144,9 +143,8 @@ public class AvroIOTest { File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath()) - .withoutSharding() - .withSchema(GenericClass.class)); + .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()) + .withoutSharding()); p.run(); PCollection<GenericClass> input = @@ -167,10 +165,9 @@ public class AvroIOTest { File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath()) + .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()) .withoutSharding() - .withCodec(CodecFactory.deflateCodec(9)) - .withSchema(GenericClass.class)); + .withCodec(CodecFactory.deflateCodec(9))); p.run(); PCollection<GenericClass> input = p @@ -193,9 +190,8 @@ public class AvroIOTest { File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath()) + .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()) .withoutSharding() - .withSchema(GenericClass.class) .withCodec(CodecFactory.nullCodec())); p.run(); @@ -260,9 +256,8 @@ public class AvroIOTest { File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath()) - .withoutSharding() - .withSchema(GenericClass.class)); + .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()) + .withoutSharding()); p.run(); List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, "hi", null), @@ -367,10 +362,9 @@ public class AvroIOTest { windowedAvroWritePipeline .apply(values) .apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1)))) - .apply(AvroIO.<GenericClass>write().to(new WindowedFilenamePolicy(outputFilePrefix)) + .apply(AvroIO.write(GenericClass.class).to(new WindowedFilenamePolicy(outputFilePrefix)) .withWindowedWrites() - .withNumShards(2) - .withSchema(GenericClass.class)); + .withNumShards(2)); windowedAvroWritePipeline.run(); // Validate that the data written matches the expected elements in the expected order @@ -402,14 +396,14 @@ public class AvroIOTest { @Test public void testWriteWithDefaultCodec() throws Exception { - AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write() + AvroIO.Write<String> write = AvroIO.write(String.class) .to("gs://bucket/foo/baz"); assertEquals(CodecFactory.deflateCodec(6).toString(), write.getCodec().toString()); } @Test public void testWriteWithCustomCodec() throws Exception { - AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write() + AvroIO.Write<?> write = AvroIO.write(String.class) .to("gs://bucket/foo/baz") .withCodec(CodecFactory.snappyCodec()); assertEquals(SNAPPY_CODEC, write.getCodec().toString()); @@ -418,7 +412,7 @@ public class AvroIOTest { @Test @SuppressWarnings("unchecked") public void testWriteWithSerDeCustomDeflateCodec() throws Exception { - AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write() + AvroIO.Write<String> write = AvroIO.write(String.class) .to("gs://bucket/foo/baz") .withCodec(CodecFactory.deflateCodec(9)); @@ -430,7 +424,7 @@ public class AvroIOTest { @Test @SuppressWarnings("unchecked") public void testWriteWithSerDeCustomXZCodec() throws Exception { - AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write() + AvroIO.Write<String> write = AvroIO.write(String.class) .to("gs://bucket/foo/baz") .withCodec(CodecFactory.xzCodec(9)); @@ -448,9 +442,8 @@ public class AvroIOTest { File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath()) + .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()) .withoutSharding() - .withSchema(GenericClass.class) .withMetadata(ImmutableMap.<String, Object>of( "stringKey", "stringValue", "longKey", 100L, @@ -471,7 +464,7 @@ public class AvroIOTest { String outputFilePrefix = baseOutputFile.getAbsolutePath(); AvroIO.Write<String> write = - AvroIO.<String>write().to(outputFilePrefix).withSchema(String.class); + AvroIO.write(String.class).to(outputFilePrefix); if (numShards > 1) { System.out.println("NumShards " + numShards); write = write.withNumShards(numShards); @@ -552,11 +545,10 @@ public class AvroIOTest { @Test public void testWriteDisplayData() { - AvroIO.Write<?> write = AvroIO.<GenericClass>write() + AvroIO.Write<?> write = AvroIO.write(GenericClass.class) .to("foo") .withShardNameTemplate("-SS-of-NN-") .withSuffix("bar") - .withSchema(GenericClass.class) .withNumShards(100) .withCodec(CodecFactory.snappyCodec()); http://git-wip-us.apache.org/repos/asf/beam/blob/27d74622/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java index fb57d5c..b4f7a79 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java @@ -271,7 +271,7 @@ public class AvroIOTransformTest { ImmutableList.<Object[]>builder() .add( new Object[] { - AvroIO.<AvroGeneratedUser>write().withSchema(AvroGeneratedUser.class), + AvroIO.write(AvroGeneratedUser.class), generatedClass }, new Object[] {
