Better-organized javadocs for TextIO and AvroIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/84eb7f3a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/84eb7f3a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/84eb7f3a Branch: refs/heads/master Commit: 84eb7f3ae431b467828a76e305123601d4ee333a Parents: 184f7a9 Author: Eugene Kirpichov <ekirpic...@gmail.com> Authored: Wed Aug 16 14:29:52 2017 -0700 Committer: Eugene Kirpichov <ekirpic...@gmail.com> Committed: Wed Aug 30 11:55:18 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroIO.java | 83 +++++++++++++------- .../java/org/apache/beam/sdk/io/TextIO.java | 30 ++++--- 2 files changed, 75 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/84eb7f3a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 9e0422e..d4a7cbb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -57,13 +57,20 @@ import org.apache.beam.sdk.values.TypeDescriptors; /** * {@link PTransform}s for reading and writing Avro files. * + * <h2>Reading Avro files</h2> + * * <p>To read a {@link PCollection} from one or more Avro files with the same schema known at - * pipeline construction time, use {@code AvroIO.read()}, using {@link AvroIO.Read#from} to specify - * the filename or filepattern to read from. Alternatively, if the filepatterns to be read are - * themselves in a {@link PCollection}, apply {@link #readAll}. + * pipeline construction time, use {@link #read}, using {@link AvroIO.Read#from} to specify the + * filename or filepattern to read from. If the filepatterns to be read are themselves in a {@link + * PCollection}, apply {@link #readAll}. If the schema is unknown at pipeline construction time, use + * {@link #parseGenericRecords} or {@link #parseAllGenericRecords}. + * + * <p>Many configuration options below apply to several or all of these transforms. * * <p>See {@link FileSystems} for information on supported file systems and filepatterns. * + * <h3>Reading records of a known schema</h3> + * * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read * {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes a * {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a @@ -71,26 +78,34 @@ import org.apache.beam.sdk.values.TypeDescriptors; * schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link * #readAllGenericRecords}. * - * <p>To read records from files whose schema is unknown at pipeline construction time or differs - * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a - * parsing function for converting each {@link GenericRecord} into a value of your custom type. - * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link - * #parseAllGenericRecords}. - * * <p>For example: * * <pre>{@code * Pipeline p = ...; * - * // A simple Read of a local file (only runs locally): + * // Read Avro-generated classes from files on GCS * PCollection<AvroAutoGenClass> records = - * p.apply(AvroIO.read(AvroAutoGenClass.class).from("/path/to/file.avro")); + * p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro")); * - * // A Read from a GCS file (runs locally and using remote execution): + * // Read GenericRecord's of the given schema from files on GCS * Schema schema = new Schema.Parser().parse(new File("schema.avsc")); * PCollection<GenericRecord> records = * p.apply(AvroIO.readGenericRecords(schema) * .from("gs://my_bucket/path/to/records-*.avro")); + * }</pre> + * + * <h3>Reading records of an unknown schema</h3> + * + * <p>To read records from files whose schema is unknown at pipeline construction time or differs + * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a + * parsing function for converting each {@link GenericRecord} into a value of your custom type. + * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link + * #parseAllGenericRecords}. + * + * <p>For example: + * + * <pre>{@code + * Pipeline p = ...; * * PCollection<Foo> records = * p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() { @@ -101,12 +116,7 @@ import org.apache.beam.sdk.values.TypeDescriptors; * })); * }</pre> * - * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of - * thousands or more), use {@link Read#withHintMatchesManyFiles} or {@link - * Parse#withHintMatchesManyFiles} for better performance and scalability. Note that it may decrease - * performance if the filepattern matches only a small number of files. - * - * <p>Reading from a {@link PCollection} of filepatterns: + * <h3>Reading from a {@link PCollection} of filepatterns</h3> * * <pre>{@code * Pipeline p = ...; @@ -120,6 +130,15 @@ import org.apache.beam.sdk.values.TypeDescriptors; * filepatterns.apply(AvroIO.parseAllGenericRecords(new SerializableFunction...); * }</pre> * + * <h3>Reading a very large number of files</h3> + * + * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of + * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and + * scalability. Note that it may decrease performance if the filepattern matches only a small number + * of files. + * + * <h2>Writing Avro files</h2> + * * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using * {@code AvroIO.write().to(String)} to specify the output filename prefix. The default {@link * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate} (set @@ -128,13 +147,11 @@ import org.apache.beam.sdk.values.TypeDescriptors; * default write filename policy using {@link Write#to(FileBasedSink.FilenamePolicy)} to specify a * custom file naming policy. * - * <p>By default, all input is put into the global window before writing. If per-window writes are - * desired - for example, when using a streaming runner - {@link AvroIO.Write#withWindowedWrites()} - * will cause windowing and triggering to be preserved. When producing windowed writes with a - * streaming runner that supports triggers, the number of output shards must be set explicitly using - * {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a runner-chosen - * value, so you may need not set it yourself. A {@link FileBasedSink.FilenamePolicy} must be set, - * and unique windows and triggers must produce unique filenames. + * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the {@link + * org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can be changed or + * overridden using {@link AvroIO.Write#withCodec}. + * + * <h3>Writing specific or generic records</h3> * * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}. To write * {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} which takes @@ -157,6 +174,18 @@ import org.apache.beam.sdk.values.TypeDescriptors; * .withSuffix(".avro")); * }</pre> * + * <h3>Writing windowed or unbounded data</h3> + * + * <p>By default, all input is put into the global window before writing. If per-window writes are + * desired - for example, when using a streaming runner - {@link AvroIO.Write#withWindowedWrites()} + * will cause windowing and triggering to be preserved. When producing windowed writes with a + * streaming runner that supports triggers, the number of output shards must be set explicitly using + * {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a runner-chosen + * value, so you may need not set it yourself. A {@link FileBasedSink.FilenamePolicy} must be set, + * and unique windows and triggers must produce unique filenames. + * + * <h3>Writing data to multiple destinations</h3> + * * <p>The following shows a more-complex example of AvroIO.Write usage, generating dynamic file * destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user * events (e.g. actions on a website) is written out to Avro files. Each event contains the user id @@ -201,10 +230,6 @@ import org.apache.beam.sdk.values.TypeDescriptors; * events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords() * .to(new UserDynamicAvros())); * }</pre> - * - * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the {@link - * org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can be changed or - * overridden using {@link AvroIO.Write#withCodec}. */ public class AvroIO { /** http://git-wip-us.apache.org/repos/asf/beam/blob/84eb7f3a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 835008f..442e4d9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -56,6 +56,8 @@ import org.joda.time.Duration; /** * {@link PTransform}s for reading and writing text files. * + * <h2>Reading text files</h2> + * * <p>To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to * instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the * file(s) to be read. Alternatively, if the filenames to be read are themselves in a {@link @@ -64,6 +66,8 @@ import org.joda.time.Duration; * <p>{@link #read} returns a {@link PCollection} of {@link String Strings}, each corresponding to * one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', or '\r\n'). * + * <h3>Filepattern expansion and watching</h3> + * * <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} and {@link * ReadAll#watchForNewFiles} allow streaming of new files matching the filepattern(s). * @@ -81,11 +85,6 @@ import org.joda.time.Duration; * PCollection<String> lines = p.apply(TextIO.read().from("/local/path/to/file.txt")); * }</pre> * - * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of - * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and - * scalability. Note that it may decrease performance if the filepattern matches only a small number - * of files. - * * <p>Example 2: reading a PCollection of filenames. * * <pre>{@code @@ -113,6 +112,15 @@ import org.joda.time.Duration; * afterTimeSinceNewOutput(Duration.standardHours(1)))); * }</pre> * + * <h3>Reading a very large number of files</h3> + * + * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of + * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and + * scalability. Note that it may decrease performance if the filepattern matches only a small number + * of files. + * + * <h2>Writing text files</h2> + * * <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using * {@link TextIO.Write#to(String)} to specify the output prefix of the files to write. * @@ -130,6 +138,13 @@ import org.joda.time.Duration; * .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP)); * }</pre> * + * <p>Any existing files with the same names as generated output files will be overwritten. + * + * <p>If you want better control over how filenames are generated than the default policy allows, a + * custom {@link FilenamePolicy} can also be set using {@link TextIO.Write#to(FilenamePolicy)}. + * + * <h3>Writing windowed or unbounded data</h3> + * * <p>By default, all input is put into the global window before writing. If per-window writes are * desired - for example, when using a streaming runner - {@link TextIO.Write#withWindowedWrites()} * will cause windowing and triggering to be preserved. When producing windowed writes with a @@ -140,8 +155,7 @@ import org.joda.time.Duration; * for the window and the pane; W is expanded into the window text, and P into the pane; the default * template will include both the window and the pane in the filename. * - * <p>If you want better control over how filenames are generated than the default policy allows, a - * custom {@link FilenamePolicy} can also be set using {@link TextIO.Write#to(FilenamePolicy)}. + * <h3>Writing data to multiple destinations</h3> * * <p>TextIO also supports dynamic, value-dependent file destinations. The most general form of this * is done via {@link TextIO.Write#to(DynamicDestinations)}. A {@link DynamicDestinations} class @@ -166,8 +180,6 @@ import org.joda.time.Duration; * }), * new Params().withBaseFilename(baseDirectory + "/empty"); * }</pre> - * - * <p>Any existing files with the same names as generated output files will be overwritten. */ public class TextIO { /**