Adds AvroIO.readGenericRecords()

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ff7a1d42
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ff7a1d42
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ff7a1d42

Branch: refs/heads/master
Commit: ff7a1d42f2902bebdf998d3f00b2b268ba150058
Parents: 1499d25
Author: Eugene Kirpichov <[email protected]>
Authored: Fri Apr 28 18:36:20 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Mon May 1 18:43:38 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/spark/io/AvroPipelineTest.java |  2 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 31 ++++++++------------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  8 ++---
 .../apache/beam/sdk/io/AvroIOTransformTest.java |  8 ++---
 4 files changed, 19 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index e3a44d2..62db14f 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -74,7 +74,7 @@ public class AvroPipelineTest {
 
     Pipeline p = pipelineRule.createPipeline();
     PCollection<GenericRecord> input = p.apply(
-        AvroIO.read().from(inputFile.getAbsolutePath()).withSchema(schema));
+        AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath()));
     
input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
     p.run().waitUntilFinish();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index abde9cb..ed172d1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -133,6 +133,18 @@ public class AvroIO {
     return new Read<>();
   }
 
+  /** Reads Avro file(s) containing records of the specified schema. */
+  public static Read<GenericRecord> readGenericRecords(Schema schema) {
+    return new Read<>(null, null, GenericRecord.class, schema);
+  }
+
+  /**
+   * Like {@link #readGenericRecords(Schema)} but the schema is specified as a 
JSON-encoded string.
+   */
+  public static Read<GenericRecord> readGenericRecords(String schema) {
+    return readGenericRecords(new Schema.Parser().parse(schema));
+  }
+
   /** Implementation of {@link #read}. */
   public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
     /** The filepattern to read from. */
@@ -178,25 +190,6 @@ public class AvroIO {
       return new Read<>(name, filepattern, type, 
ReflectData.get().getSchema(type));
     }
 
-    /**
-     * Returns a new {@link PTransform} that's like this one but
-     * that reads Avro file(s) containing records of the specified schema.
-     */
-    public Read<GenericRecord> withSchema(Schema schema) {
-      return new Read<>(name, filepattern, GenericRecord.class, schema);
-    }
-
-    /**
-     * Returns a new {@link PTransform} that's like this one but
-     * that reads Avro file(s) containing records of the specified schema
-     * in a JSON-encoded string form.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read<GenericRecord> withSchema(String schema) {
-      return withSchema((new Schema.Parser()).parse(schema));
-    }
-
     @Override
     public PCollection<T> expand(PBegin input) {
       if (filepattern == null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 6d842b3..2144b0d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -282,10 +282,6 @@ public class AvroIOTest {
     p.run();
   }
 
-  private TimestampedValue<GenericClass> newValue(GenericClass element, 
Duration duration) {
-    return TimestampedValue.of(element, new Instant(0).plus(duration));
-  }
-
   private static class WindowedFilenamePolicy extends FilenamePolicy {
     String outputFilePrefix;
 
@@ -550,8 +546,8 @@ public class AvroIOTest {
   public void testPrimitiveReadDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
 
-    AvroIO.Read<?> read = AvroIO.read().from("foo.*")
-        .withSchema(Schema.create(Schema.Type.STRING));
+    AvroIO.Read<?> read =
+        
AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("foo.*");
 
     Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveSourceTransforms(read);
     assertThat("AvroIO.Read should include the file pattern in its primitive 
transform",

http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
index 06b9841..b974663 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -185,14 +185,14 @@ public class AvroIOTransformTest {
                   // test read using schema object
                   new Object[] {
                       null,
-                      AvroIO.read().withSchema(SCHEMA),
+                      AvroIO.readGenericRecords(SCHEMA),
                       "AvroIO.Read/Read.out",
                       generateAvroGenericRecords(),
                       fromSchema
                   },
                   new Object[] {
                       "MyRead",
-                      AvroIO.read().withSchema(SCHEMA),
+                      AvroIO.readGenericRecords(SCHEMA),
                       "MyRead/Read.out",
                       generateAvroGenericRecords(),
                       fromSchema
@@ -201,14 +201,14 @@ public class AvroIOTransformTest {
                   // test read using schema string
                   new Object[] {
                       null,
-                      AvroIO.read().withSchema(SCHEMA_STRING),
+                      AvroIO.readGenericRecords(SCHEMA_STRING),
                       "AvroIO.Read/Read.out",
                       generateAvroGenericRecords(),
                       fromSchemaString
                   },
                   new Object[] {
                       "MyRead",
-                      AvroIO.read().withSchema(SCHEMA_STRING),
+                      AvroIO.readGenericRecords(SCHEMA_STRING),
                       "MyRead/Read.out",
                       generateAvroGenericRecords(),
                       fromSchemaString

Reply via email to