Repository: beam
Updated Branches:
  refs/heads/master 6d443bc39 -> 034565c68


Scattered minor improvements per review comments


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/caf2faeb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/caf2faeb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/caf2faeb

Branch: refs/heads/master
Commit: caf2faeb5e0b173f4e40f4af70c14d1d5d4244e4
Parents: 27d7462
Author: Eugene Kirpichov <[email protected]>
Authored: Mon May 1 17:00:46 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Mon May 1 18:43:38 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 81 +++++++++-----------
 .../java/org/apache/beam/sdk/io/AvroSink.java   | 14 +---
 .../java/org/apache/beam/sdk/io/AvroSource.java |  4 +-
 .../beam/sdk/testing/SourceTestUtils.java       |  5 +-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 44 ++++++-----
 5 files changed, 69 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 6b66a98..755cdb9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -33,6 +33,7 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -45,11 +46,9 @@ import org.apache.beam.sdk.values.PDone;
 /**
  * {@link PTransform}s for reading and writing Avro files.
  *
- * <p>To read a {@link PCollection} from one or more Avro files, use
- * {@code AvroIO.read()}, specifying {@link AvroIO.Read#from} to specify
- * the path of the file(s) to read from (e.g., a local filename or
- * filename pattern if running locally, or a Google Cloud Storage
- * filename or filename pattern of the form {@code 
"gs://<bucket>/<filepath>"}).
+ * <p>To read a {@link PCollection} from one or more Avro files, use {@code 
AvroIO.read()},
+ * specifying {@link AvroIO.Read#from} to specify the filename or filepattern 
to read from.
+ * See {@link FileSystems} for information on supported file systems and 
filepatterns.
  *
  * <p>To read specific records, such as Avro-generated classes, use {@link 
#read(Class)}.
  * To read {@link GenericRecord GenericRecords}, use {@link 
#readGenericRecords(Schema)} which takes
@@ -72,13 +71,12 @@ import org.apache.beam.sdk.values.PDone;
  *                .from("gs://my_bucket/path/to/records-*.avro"));
  * } </pre>
  *
- * <p>To write a {@link PCollection} to one or more Avro files, use
- * {@link AvroIO.Write}, specifying {@code AvroIO.write().to(String)} to 
specify
- * the path of the file to write to (e.g., a local filename or sharded
- * filename pattern if running locally, or a Google Cloud Storage
- * filename or sharded filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}). {@code 
AvroIO.write().to(FileBasedSink.FilenamePolicy)}
- * can also be used to specify a custom file naming policy.
+ * <p>To write a {@link PCollection} to one or more Avro files, use {@link 
AvroIO.Write}, specifying
+ * {@code AvroIO.write().to(String)} to specify the filename or sharded 
filepattern to write to.
+ * See {@link FileSystems} for information on supported file systems and 
{@link ShardNameTemplate}
+ * for information on naming of output files. You can also use {@code 
AvroIO.write()} with
+ * {@link Write#to(FileBasedSink.FilenamePolicy)} to
+ * specify a custom file naming policy.
  *
  * <p>By default, all input is put into the global window before writing. If 
per-window writes are
  * desired - for example, when using a streaming runner -
@@ -140,7 +138,8 @@ public class AvroIO {
   }
 
   /**
-   * Like {@link #readGenericRecords(Schema)} but the schema is specified as a 
JSON-encoded string.
+   * Reads Avro file(s) containing records of the specified schema. The schema 
is specified as a
+   * JSON-encoded string.
    */
   public static Read<GenericRecord> readGenericRecords(String schema) {
     return readGenericRecords(new Schema.Parser().parse(schema));
@@ -165,6 +164,13 @@ public class AvroIO {
         .build();
   }
 
+  /**
+   * Writes Avro records of the specified schema. The schema is specified as a 
JSON-encoded string.
+   */
+  public static Write<GenericRecord> writeGenericRecords(String schema) {
+    return writeGenericRecords(new Schema.Parser().parse(schema));
+  }
+
   private static <T> Write.Builder<T> defaultWriteBuilder() {
     return new AutoValue_AvroIO_Write.Builder<T>()
         .setFilenameSuffix("")
@@ -175,13 +181,6 @@ public class AvroIO {
         .setWindowedWrites(false);
   }
 
-  /**
-   * Like {@link #writeGenericRecords(Schema)} but the schema is specified as 
a JSON-encoded string.
-   */
-  public static Write<GenericRecord> writeGenericRecords(String schema) {
-    return writeGenericRecords(new Schema.Parser().parse(schema));
-  }
-
   /** Implementation of {@link #read}. */
   @AutoValue
   public abstract static class Read<T> extends PTransform<PBegin, 
PCollection<T>> {
@@ -200,15 +199,7 @@ public class AvroIO {
       abstract Read<T> build();
     }
 
-    /**
-     * Reads from the file(s) with the given name or pattern. This can be a 
local filename
-     * or filename pattern (if running locally), or a Google Cloud
-     * Storage filename or filename pattern of the form
-     * {@code "gs://<bucket>/<filepath>"} (if running locally or
-     * using remote execution). Standard
-     * <a 
href="http://docs.oracle.com/javase/tutorial/essential/io/find.html";>Java
-     * Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
-     */
+    /** Reads from the given filename or filepattern. */
     public Read<T> from(String filepattern) {
       return toBuilder().setFilepattern(filepattern).build();
     }
@@ -275,7 +266,7 @@ public class AvroIO {
     abstract Class<T> getRecordClass();
     @Nullable abstract Schema getSchema();
     abstract boolean getWindowedWrites();
-    @Nullable abstract FileBasedSink.FilenamePolicy getFilenamePolicy();
+    @Nullable abstract FilenamePolicy getFilenamePolicy();
     /**
      * The codec used to encode the blocks in the Avro file. String value 
drawn from those in
      * 
https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
@@ -295,7 +286,7 @@ public class AvroIO {
       abstract Builder<T> setRecordClass(Class<T> recordClass);
       abstract Builder<T> setSchema(Schema schema);
       abstract Builder<T> setWindowedWrites(boolean windowedWrites);
-      abstract Builder<T> setFilenamePolicy(FileBasedSink.FilenamePolicy 
filenamePolicy);
+      abstract Builder<T> setFilenamePolicy(FilenamePolicy filenamePolicy);
       abstract Builder<T> setCodec(SerializableAvroCodecFactory codec);
       abstract Builder<T> setMetadata(ImmutableMap<String, Object> metadata);
 
@@ -303,10 +294,8 @@ public class AvroIO {
     }
 
     /**
-     * Writes to the file(s) with the given prefix. This can be a local 
filename
-     * (if running locally), or a Google Cloud Storage filename of
-     * the form {@code "gs://<bucket>/<filepath>"}
-     * (if running locally or using remote execution).
+     * Writes to the file(s) with the given prefix. See {@link FileSystems} 
for information on
+     * supported file systems.
      *
      * <p>The files written will begin with this prefix, followed by
      * a shard identifier (see {@link #withNumShards}, and end
@@ -318,7 +307,7 @@ public class AvroIO {
     }
 
     /** Writes to the file(s) specified by the provided {@link 
FileBasedSink.FilenamePolicy}. */
-    public Write<T> to(FileBasedSink.FilenamePolicy filenamePolicy) {
+    public Write<T> to(FilenamePolicy filenamePolicy) {
       return toBuilder().setFilenamePolicy(filenamePolicy).build();
     }
 
@@ -333,7 +322,8 @@ public class AvroIO {
     }
 
     /**
-     * Uses the provided shard count.
+     * Uses the provided shard count. See {@link ShardNameTemplate} for a 
description of shard
+     * templates.
      *
      * <p>Constraining the number of shards is likely to reduce
      * the performance of a pipeline. Setting this value is not recommended
@@ -341,19 +331,13 @@ public class AvroIO {
      *
      * @param numShards the number of shards to use, or 0 to let the system
      *                  decide.
-     * @see ShardNameTemplate
      */
     public Write<T> withNumShards(int numShards) {
       checkArgument(numShards >= 0);
       return toBuilder().setNumShards(numShards).build();
     }
 
-    /**
-     * Returns a new {@link PTransform} that's like this one but
-     * that uses the given shard name template.
-     *
-     * @see ShardNameTemplate
-     */
+    /** Uses the given {@link ShardNameTemplate} for naming output files. */
     public Write<T> withShardNameTemplate(String shardTemplate) {
       return toBuilder().setShardTemplate(shardTemplate).build();
     }
@@ -361,12 +345,19 @@ public class AvroIO {
     /**
      * Forces a single file as output.
      *
-     * <p>This is a shortcut for {@code 
.withNumShards(1).withShardNameTemplate("")}
+     * <p>This is equivalent to {@code 
.withNumShards(1).withShardNameTemplate("")}
      */
     public Write<T> withoutSharding() {
       return withNumShards(1).withShardNameTemplate("");
     }
 
+    /**
+     * Preserves windowing of input elements and writes them to files based on 
the element's window.
+     *
+     * <p>Requires use of {@link #to(FileBasedSink.FilenamePolicy)}. Filenames 
will be generated
+     * using {@link 
FilenamePolicy#windowedFilename(FileBasedSink.FilenamePolicy.WindowedContext)}.
+     * See also {@link WriteFiles#withWindowedWrites()}.
+     */
     public Write<T> withWindowedWrites() {
       return toBuilder().setWindowedWrites(true).build();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
index 16f233c..46bb4f3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
@@ -30,9 +30,7 @@ import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.MimeTypes;
 
-/**
- * A {@link FileBasedSink} for Avro files.
- */
+/** A {@link FileBasedSink} for Avro files. */
 class AvroSink<T> extends FileBasedSink<T> {
   private final AvroCoder<T> coder;
   private final SerializableAvroCodecFactory codec;
@@ -67,10 +65,7 @@ class AvroSink<T> extends FileBasedSink<T> {
     return new AvroWriteOperation<>(this, coder, codec, metadata);
   }
 
-  /**
-   * A {@link FileBasedWriteOperation
-   * FileBasedWriteOperation} for Avro files.
-   */
+  /** A {@link FileBasedWriteOperation FileBasedWriteOperation} for Avro 
files. */
   private static class AvroWriteOperation<T> extends 
FileBasedWriteOperation<T> {
     private final AvroCoder<T> coder;
     private final SerializableAvroCodecFactory codec;
@@ -92,10 +87,7 @@ class AvroSink<T> extends FileBasedSink<T> {
     }
   }
 
-  /**
-   * A {@link FileBasedWriter FileBasedWriter}
-   * for Avro files.
-   */
+  /** A {@link FileBasedWriter FileBasedWriter} for Avro files. */
   private static class AvroWriter<T> extends FileBasedWriter<T> {
     private final AvroCoder<T> coder;
     private DataFileWriter<T> dataFileWriter;

http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 58e6555..96d21c6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -62,7 +62,9 @@ import org.apache.commons.compress.utils.CountingInputStream;
 
 // CHECKSTYLE.OFF: JavadocStyle
 /**
- * A {@link FileBasedSource} for reading Avro files.
+ * Do not use in pipelines directly: most users should use {@link AvroIO.Read}.
+ *
+ * <p>A {@link FileBasedSource} for reading Avro files.
  *
  * <p>To read a {@link PCollection} of objects from one or more Avro files, use
  * {@link AvroSource#from} to specify the path(s) of the files to read. The 
{@link AvroSource} that

http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
index fd7ae85..cde0b94 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
@@ -72,9 +72,8 @@ import org.slf4j.LoggerFactory;
  *   as a heavy-weight stress test including concurrency. We strongly 
recommend to
  *   use both.
  * </ul>
- * For example usages, see the unit tests of classes such as
- * {@link org.apache.beam.sdk.io.AvroSource} or
- * {@link org.apache.beam.sdk.io.TextIO TextIO.TextSource}.
+ * For example usages, see the unit tests of classes such as {@code 
AvroSource} or
+ * {@code TextSource}.
  *
  * <p>Like {@link PAssert}, requires JUnit and Hamcrest to be present in the 
classpath.
  */

http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index e421b96..d14d9b2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -45,6 +45,7 @@ import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.Nullable;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
@@ -143,8 +144,9 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-      .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
-          .withoutSharding());
+     .apply(AvroIO.write(GenericClass.class)
+         .to(outputFile.getAbsolutePath())
+         .withoutSharding());
     p.run();
 
     PCollection<GenericClass> input =
@@ -165,9 +167,10 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-        
.apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
-            .withoutSharding()
-            .withCodec(CodecFactory.deflateCodec(9)));
+     .apply(AvroIO.write(GenericClass.class)
+         .to(outputFile.getAbsolutePath())
+         .withoutSharding()
+         .withCodec(CodecFactory.deflateCodec(9)));
     p.run();
 
     PCollection<GenericClass> input = p
@@ -190,9 +193,10 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-        
.apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
-            .withoutSharding()
-            .withCodec(CodecFactory.nullCodec()));
+      .apply(AvroIO.write(GenericClass.class)
+          .to(outputFile.getAbsolutePath())
+          .withoutSharding()
+          .withCodec(CodecFactory.nullCodec()));
     p.run();
 
     PCollection<GenericClass> input = p
@@ -256,7 +260,8 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-      .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
+      .apply(AvroIO.write(GenericClass.class)
+          .to(outputFile.getAbsolutePath())
           .withoutSharding());
     p.run();
 
@@ -362,7 +367,8 @@ public class AvroIOTest {
     windowedAvroWritePipeline
         .apply(values)
         
.apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1))))
-        .apply(AvroIO.write(GenericClass.class).to(new 
WindowedFilenamePolicy(outputFilePrefix))
+        .apply(AvroIO.write(GenericClass.class)
+            .to(new WindowedFilenamePolicy(outputFilePrefix))
             .withWindowedWrites()
             .withNumShards(2));
     windowedAvroWritePipeline.run();
@@ -403,7 +409,7 @@ public class AvroIOTest {
 
   @Test
   public void testWriteWithCustomCodec() throws Exception {
-    AvroIO.Write<?> write = AvroIO.write(String.class)
+    AvroIO.Write<String> write = AvroIO.write(String.class)
         .to("gs://bucket/foo/baz")
         .withCodec(CodecFactory.snappyCodec());
     assertEquals(SNAPPY_CODEC, write.getCodec().toString());
@@ -442,7 +448,8 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-        
.apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
+        .apply(AvroIO.write(GenericClass.class)
+            .to(outputFile.getAbsolutePath())
             .withoutSharding()
             .withMetadata(ImmutableMap.<String, Object>of(
                 "stringKey", "stringValue",
@@ -463,8 +470,7 @@ public class AvroIOTest {
     File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
     String outputFilePrefix = baseOutputFile.getAbsolutePath();
 
-    AvroIO.Write<String> write =
-        AvroIO.write(String.class).to(outputFilePrefix);
+    AvroIO.Write<String> write = 
AvroIO.write(String.class).to(outputFilePrefix);
     if (numShards > 1) {
       System.out.println("NumShards " + numShards);
       write = write.withNumShards(numShards);
@@ -524,7 +530,7 @@ public class AvroIOTest {
 
   @Test
   public void testReadDisplayData() {
-    AvroIO.Read<?> read = AvroIO.read(String.class).from("foo.*");
+    AvroIO.Read<String> read = AvroIO.read(String.class).from("foo.*");
 
     DisplayData displayData = DisplayData.from(read);
     assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
@@ -535,7 +541,7 @@ public class AvroIOTest {
   public void testPrimitiveReadDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
 
-    AvroIO.Read<?> read =
+    AvroIO.Read<GenericRecord> read =
         
AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("foo.*");
 
     Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveSourceTransforms(read);
@@ -545,7 +551,7 @@ public class AvroIOTest {
 
   @Test
   public void testWriteDisplayData() {
-    AvroIO.Write<?> write = AvroIO.write(GenericClass.class)
+    AvroIO.Write<GenericClass> write = AvroIO.write(GenericClass.class)
         .to("foo")
         .withShardNameTemplate("-SS-of-NN-")
         .withSuffix("bar")
@@ -572,8 +578,8 @@ public class AvroIOTest {
 
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
 
-    AvroIO.Write<?> write = 
AvroIO.writeGenericRecords(Schema.create(Schema.Type.STRING))
-        .to(outputPath);
+    AvroIO.Write<GenericRecord> write =
+        
AvroIO.writeGenericRecords(Schema.create(Schema.Type.STRING)).to(outputPath);
 
     Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(write);
     assertThat("AvroIO.Write should include the file pattern in its primitive 
transform",

Reply via email to