Adds AvroIO.readGenericRecords()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ff7a1d42 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ff7a1d42 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ff7a1d42 Branch: refs/heads/master Commit: ff7a1d42f2902bebdf998d3f00b2b268ba150058 Parents: 1499d25 Author: Eugene Kirpichov <[email protected]> Authored: Fri Apr 28 18:36:20 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Mon May 1 18:43:38 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/spark/io/AvroPipelineTest.java | 2 +- .../java/org/apache/beam/sdk/io/AvroIO.java | 31 ++++++++------------ .../java/org/apache/beam/sdk/io/AvroIOTest.java | 8 ++--- .../apache/beam/sdk/io/AvroIOTransformTest.java | 8 ++--- 4 files changed, 19 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java index e3a44d2..62db14f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -74,7 +74,7 @@ public class AvroPipelineTest { Pipeline p = pipelineRule.createPipeline(); PCollection<GenericRecord> input = p.apply( - AvroIO.read().from(inputFile.getAbsolutePath()).withSchema(schema)); + AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath())); input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema)); p.run().waitUntilFinish(); http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index abde9cb..ed172d1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -133,6 +133,18 @@ public class AvroIO { return new Read<>(); } + /** Reads Avro file(s) containing records of the specified schema. */ + public static Read<GenericRecord> readGenericRecords(Schema schema) { + return new Read<>(null, null, GenericRecord.class, schema); + } + + /** + * Like {@link #readGenericRecords(Schema)} but the schema is specified as a JSON-encoded string. + */ + public static Read<GenericRecord> readGenericRecords(String schema) { + return readGenericRecords(new Schema.Parser().parse(schema)); + } + /** Implementation of {@link #read}. */ public static class Read<T> extends PTransform<PBegin, PCollection<T>> { /** The filepattern to read from. */ @@ -178,25 +190,6 @@ public class AvroIO { return new Read<>(name, filepattern, type, ReflectData.get().getSchema(type)); } - /** - * Returns a new {@link PTransform} that's like this one but - * that reads Avro file(s) containing records of the specified schema. - */ - public Read<GenericRecord> withSchema(Schema schema) { - return new Read<>(name, filepattern, GenericRecord.class, schema); - } - - /** - * Returns a new {@link PTransform} that's like this one but - * that reads Avro file(s) containing records of the specified schema - * in a JSON-encoded string form. - * - * <p>Does not modify this object. - */ - public Read<GenericRecord> withSchema(String schema) { - return withSchema((new Schema.Parser()).parse(schema)); - } - @Override public PCollection<T> expand(PBegin input) { if (filepattern == null) { http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 6d842b3..2144b0d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -282,10 +282,6 @@ public class AvroIOTest { p.run(); } - private TimestampedValue<GenericClass> newValue(GenericClass element, Duration duration) { - return TimestampedValue.of(element, new Instant(0).plus(duration)); - } - private static class WindowedFilenamePolicy extends FilenamePolicy { String outputFilePrefix; @@ -550,8 +546,8 @@ public class AvroIOTest { public void testPrimitiveReadDisplayData() { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - AvroIO.Read<?> read = AvroIO.read().from("foo.*") - .withSchema(Schema.create(Schema.Type.STRING)); + AvroIO.Read<?> read = + AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("foo.*"); Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); assertThat("AvroIO.Read should include the file pattern in its primitive transform", http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java index 06b9841..b974663 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java @@ -185,14 +185,14 @@ public class AvroIOTransformTest { // test read using schema object new Object[] { null, - AvroIO.read().withSchema(SCHEMA), + AvroIO.readGenericRecords(SCHEMA), "AvroIO.Read/Read.out", generateAvroGenericRecords(), fromSchema }, new Object[] { "MyRead", - AvroIO.read().withSchema(SCHEMA), + AvroIO.readGenericRecords(SCHEMA), "MyRead/Read.out", generateAvroGenericRecords(), fromSchema @@ -201,14 +201,14 @@ public class AvroIOTransformTest { // test read using schema string new Object[] { null, - AvroIO.read().withSchema(SCHEMA_STRING), + AvroIO.readGenericRecords(SCHEMA_STRING), "AvroIO.Read/Read.out", generateAvroGenericRecords(), fromSchemaString }, new Object[] { "MyRead", - AvroIO.read().withSchema(SCHEMA_STRING), + AvroIO.readGenericRecords(SCHEMA_STRING), "MyRead/Read.out", generateAvroGenericRecords(), fromSchemaString
