Repository: beam Updated Branches: refs/heads/master 6d443bc39 -> 034565c68
Scattered minor improvements per review comments Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/caf2faeb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/caf2faeb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/caf2faeb Branch: refs/heads/master Commit: caf2faeb5e0b173f4e40f4af70c14d1d5d4244e4 Parents: 27d7462 Author: Eugene Kirpichov <[email protected]> Authored: Mon May 1 17:00:46 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Mon May 1 18:43:38 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroIO.java | 81 +++++++++----------- .../java/org/apache/beam/sdk/io/AvroSink.java | 14 +--- .../java/org/apache/beam/sdk/io/AvroSource.java | 4 +- .../beam/sdk/testing/SourceTestUtils.java | 5 +- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 44 ++++++----- 5 files changed, 69 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 6b66a98..755cdb9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -33,6 +33,7 @@ import org.apache.avro.reflect.ReflectData; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.PTransform; @@ -45,11 +46,9 @@ import org.apache.beam.sdk.values.PDone; /** * {@link PTransform}s for reading and writing Avro files. * - * <p>To read a {@link PCollection} from one or more Avro files, use - * {@code AvroIO.read()}, specifying {@link AvroIO.Read#from} to specify - * the path of the file(s) to read from (e.g., a local filename or - * filename pattern if running locally, or a Google Cloud Storage - * filename or filename pattern of the form {@code "gs://<bucket>/<filepath>"}). + * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()}, + * specifying {@link AvroIO.Read#from} to specify the filename or filepattern to read from. + * See {@link FileSystems} for information on supported file systems and filepatterns. * * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. * To read {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes @@ -72,13 +71,12 @@ import org.apache.beam.sdk.values.PDone; * .from("gs://my_bucket/path/to/records-*.avro")); * } </pre> * - * <p>To write a {@link PCollection} to one or more Avro files, use - * {@link AvroIO.Write}, specifying {@code AvroIO.write().to(String)} to specify - * the path of the file to write to (e.g., a local filename or sharded - * filename pattern if running locally, or a Google Cloud Storage - * filename or sharded filename pattern of the form - * {@code "gs://<bucket>/<filepath>"}). {@code AvroIO.write().to(FileBasedSink.FilenamePolicy)} - * can also be used to specify a custom file naming policy. + * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, specifying + * {@code AvroIO.write().to(String)} to specify the filename or sharded filepattern to write to. + * See {@link FileSystems} for information on supported file systems and {@link ShardNameTemplate} + * for information on naming of output files. You can also use {@code AvroIO.write()} with + * {@link Write#to(FileBasedSink.FilenamePolicy)} to + * specify a custom file naming policy. * * <p>By default, all input is put into the global window before writing. If per-window writes are * desired - for example, when using a streaming runner - @@ -140,7 +138,8 @@ public class AvroIO { } /** - * Like {@link #readGenericRecords(Schema)} but the schema is specified as a JSON-encoded string. + * Reads Avro file(s) containing records of the specified schema. The schema is specified as a + * JSON-encoded string. */ public static Read<GenericRecord> readGenericRecords(String schema) { return readGenericRecords(new Schema.Parser().parse(schema)); @@ -165,6 +164,13 @@ public class AvroIO { .build(); } + /** + * Writes Avro records of the specified schema. The schema is specified as a JSON-encoded string. + */ + public static Write<GenericRecord> writeGenericRecords(String schema) { + return writeGenericRecords(new Schema.Parser().parse(schema)); + } + private static <T> Write.Builder<T> defaultWriteBuilder() { return new AutoValue_AvroIO_Write.Builder<T>() .setFilenameSuffix("") @@ -175,13 +181,6 @@ public class AvroIO { .setWindowedWrites(false); } - /** - * Like {@link #writeGenericRecords(Schema)} but the schema is specified as a JSON-encoded string. - */ - public static Write<GenericRecord> writeGenericRecords(String schema) { - return writeGenericRecords(new Schema.Parser().parse(schema)); - } - /** Implementation of {@link #read}. */ @AutoValue public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> { @@ -200,15 +199,7 @@ public class AvroIO { abstract Read<T> build(); } - /** - * Reads from the file(s) with the given name or pattern. This can be a local filename - * or filename pattern (if running locally), or a Google Cloud - * Storage filename or filename pattern of the form - * {@code "gs://<bucket>/<filepath>"} (if running locally or - * using remote execution). Standard - * <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html">Java - * Filesystem glob patterns</a> ("*", "?", "[..]") are supported. - */ + /** Reads from the given filename or filepattern. */ public Read<T> from(String filepattern) { return toBuilder().setFilepattern(filepattern).build(); } @@ -275,7 +266,7 @@ public class AvroIO { abstract Class<T> getRecordClass(); @Nullable abstract Schema getSchema(); abstract boolean getWindowedWrites(); - @Nullable abstract FileBasedSink.FilenamePolicy getFilenamePolicy(); + @Nullable abstract FilenamePolicy getFilenamePolicy(); /** * The codec used to encode the blocks in the Avro file. String value drawn from those in * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html @@ -295,7 +286,7 @@ public class AvroIO { abstract Builder<T> setRecordClass(Class<T> recordClass); abstract Builder<T> setSchema(Schema schema); abstract Builder<T> setWindowedWrites(boolean windowedWrites); - abstract Builder<T> setFilenamePolicy(FileBasedSink.FilenamePolicy filenamePolicy); + abstract Builder<T> setFilenamePolicy(FilenamePolicy filenamePolicy); abstract Builder<T> setCodec(SerializableAvroCodecFactory codec); abstract Builder<T> setMetadata(ImmutableMap<String, Object> metadata); @@ -303,10 +294,8 @@ public class AvroIO { } /** - * Writes to the file(s) with the given prefix. This can be a local filename - * (if running locally), or a Google Cloud Storage filename of - * the form {@code "gs://<bucket>/<filepath>"} - * (if running locally or using remote execution). + * Writes to the file(s) with the given prefix. See {@link FileSystems} for information on + * supported file systems. * * <p>The files written will begin with this prefix, followed by * a shard identifier (see {@link #withNumShards}, and end @@ -318,7 +307,7 @@ public class AvroIO { } /** Writes to the file(s) specified by the provided {@link FileBasedSink.FilenamePolicy}. */ - public Write<T> to(FileBasedSink.FilenamePolicy filenamePolicy) { + public Write<T> to(FilenamePolicy filenamePolicy) { return toBuilder().setFilenamePolicy(filenamePolicy).build(); } @@ -333,7 +322,8 @@ public class AvroIO { } /** - * Uses the provided shard count. + * Uses the provided shard count. See {@link ShardNameTemplate} for a description of shard + * templates. * * <p>Constraining the number of shards is likely to reduce * the performance of a pipeline. Setting this value is not recommended @@ -341,19 +331,13 @@ public class AvroIO { * * @param numShards the number of shards to use, or 0 to let the system * decide. - * @see ShardNameTemplate */ public Write<T> withNumShards(int numShards) { checkArgument(numShards >= 0); return toBuilder().setNumShards(numShards).build(); } - /** - * Returns a new {@link PTransform} that's like this one but - * that uses the given shard name template. - * - * @see ShardNameTemplate - */ + /** Uses the given {@link ShardNameTemplate} for naming output files. */ public Write<T> withShardNameTemplate(String shardTemplate) { return toBuilder().setShardTemplate(shardTemplate).build(); } @@ -361,12 +345,19 @@ public class AvroIO { /** * Forces a single file as output. * - * <p>This is a shortcut for {@code .withNumShards(1).withShardNameTemplate("")} + * <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")} */ public Write<T> withoutSharding() { return withNumShards(1).withShardNameTemplate(""); } + /** + * Preserves windowing of input elements and writes them to files based on the element's window. + * + * <p>Requires use of {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated + * using {@link FilenamePolicy#windowedFilename(FileBasedSink.FilenamePolicy.WindowedContext)}. + * See also {@link WriteFiles#withWindowedWrites()}. + */ public Write<T> withWindowedWrites() { return toBuilder().setWindowedWrites(true).build(); } http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java index 16f233c..46bb4f3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java @@ -30,9 +30,7 @@ import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.MimeTypes; -/** - * A {@link FileBasedSink} for Avro files. - */ +/** A {@link FileBasedSink} for Avro files. */ class AvroSink<T> extends FileBasedSink<T> { private final AvroCoder<T> coder; private final SerializableAvroCodecFactory codec; @@ -67,10 +65,7 @@ class AvroSink<T> extends FileBasedSink<T> { return new AvroWriteOperation<>(this, coder, codec, metadata); } - /** - * A {@link FileBasedWriteOperation - * FileBasedWriteOperation} for Avro files. - */ + /** A {@link FileBasedWriteOperation FileBasedWriteOperation} for Avro files. */ private static class AvroWriteOperation<T> extends FileBasedWriteOperation<T> { private final AvroCoder<T> coder; private final SerializableAvroCodecFactory codec; @@ -92,10 +87,7 @@ class AvroSink<T> extends FileBasedSink<T> { } } - /** - * A {@link FileBasedWriter FileBasedWriter} - * for Avro files. - */ + /** A {@link FileBasedWriter FileBasedWriter} for Avro files. */ private static class AvroWriter<T> extends FileBasedWriter<T> { private final AvroCoder<T> coder; private DataFileWriter<T> dataFileWriter; http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java index 58e6555..96d21c6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java @@ -62,7 +62,9 @@ import org.apache.commons.compress.utils.CountingInputStream; // CHECKSTYLE.OFF: JavadocStyle /** - * A {@link FileBasedSource} for reading Avro files. + * Do not use in pipelines directly: most users should use {@link AvroIO.Read}. + * + * <p>A {@link FileBasedSource} for reading Avro files. * * <p>To read a {@link PCollection} of objects from one or more Avro files, use * {@link AvroSource#from} to specify the path(s) of the files to read. The {@link AvroSource} that http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java index fd7ae85..cde0b94 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java @@ -72,9 +72,8 @@ import org.slf4j.LoggerFactory; * as a heavy-weight stress test including concurrency. We strongly recommend to * use both. * </ul> - * For example usages, see the unit tests of classes such as - * {@link org.apache.beam.sdk.io.AvroSource} or - * {@link org.apache.beam.sdk.io.TextIO TextIO.TextSource}. + * For example usages, see the unit tests of classes such as {@code AvroSource} or + * {@code TextSource}. * * <p>Like {@link PAssert}, requires JUnit and Hamcrest to be present in the classpath. */ http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index e421b96..d14d9b2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -45,6 +45,7 @@ import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.Nullable; import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; @@ -143,8 +144,9 @@ public class AvroIOTest { File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()) - .withoutSharding()); + .apply(AvroIO.write(GenericClass.class) + .to(outputFile.getAbsolutePath()) + .withoutSharding()); p.run(); PCollection<GenericClass> input = @@ -165,9 +167,10 @@ public class AvroIOTest { File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()) - .withoutSharding() - .withCodec(CodecFactory.deflateCodec(9))); + .apply(AvroIO.write(GenericClass.class) + .to(outputFile.getAbsolutePath()) + .withoutSharding() + .withCodec(CodecFactory.deflateCodec(9))); p.run(); PCollection<GenericClass> input = p @@ -190,9 +193,10 @@ public class AvroIOTest { File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()) - .withoutSharding() - .withCodec(CodecFactory.nullCodec())); + .apply(AvroIO.write(GenericClass.class) + .to(outputFile.getAbsolutePath()) + .withoutSharding() + .withCodec(CodecFactory.nullCodec())); p.run(); PCollection<GenericClass> input = p @@ -256,7 +260,8 @@ public class AvroIOTest { File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()) + .apply(AvroIO.write(GenericClass.class) + .to(outputFile.getAbsolutePath()) .withoutSharding()); p.run(); @@ -362,7 +367,8 @@ public class AvroIOTest { windowedAvroWritePipeline .apply(values) .apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1)))) - .apply(AvroIO.write(GenericClass.class).to(new WindowedFilenamePolicy(outputFilePrefix)) + .apply(AvroIO.write(GenericClass.class) + .to(new WindowedFilenamePolicy(outputFilePrefix)) .withWindowedWrites() .withNumShards(2)); windowedAvroWritePipeline.run(); @@ -403,7 +409,7 @@ public class AvroIOTest { @Test public void testWriteWithCustomCodec() throws Exception { - AvroIO.Write<?> write = AvroIO.write(String.class) + AvroIO.Write<String> write = AvroIO.write(String.class) .to("gs://bucket/foo/baz") .withCodec(CodecFactory.snappyCodec()); assertEquals(SNAPPY_CODEC, write.getCodec().toString()); @@ -442,7 +448,8 @@ public class AvroIOTest { File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()) + .apply(AvroIO.write(GenericClass.class) + .to(outputFile.getAbsolutePath()) .withoutSharding() .withMetadata(ImmutableMap.<String, Object>of( "stringKey", "stringValue", @@ -463,8 +470,7 @@ public class AvroIOTest { File baseOutputFile = new File(tmpFolder.getRoot(), "prefix"); String outputFilePrefix = baseOutputFile.getAbsolutePath(); - AvroIO.Write<String> write = - AvroIO.write(String.class).to(outputFilePrefix); + AvroIO.Write<String> write = AvroIO.write(String.class).to(outputFilePrefix); if (numShards > 1) { System.out.println("NumShards " + numShards); write = write.withNumShards(numShards); @@ -524,7 +530,7 @@ public class AvroIOTest { @Test public void testReadDisplayData() { - AvroIO.Read<?> read = AvroIO.read(String.class).from("foo.*"); + AvroIO.Read<String> read = AvroIO.read(String.class).from("foo.*"); DisplayData displayData = DisplayData.from(read); assertThat(displayData, hasDisplayItem("filePattern", "foo.*")); @@ -535,7 +541,7 @@ public class AvroIOTest { public void testPrimitiveReadDisplayData() { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - AvroIO.Read<?> read = + AvroIO.Read<GenericRecord> read = AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("foo.*"); Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); @@ -545,7 +551,7 @@ public class AvroIOTest { @Test public void testWriteDisplayData() { - AvroIO.Write<?> write = AvroIO.write(GenericClass.class) + AvroIO.Write<GenericClass> write = AvroIO.write(GenericClass.class) .to("foo") .withShardNameTemplate("-SS-of-NN-") .withSuffix("bar") @@ -572,8 +578,8 @@ public class AvroIOTest { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options); - AvroIO.Write<?> write = AvroIO.writeGenericRecords(Schema.create(Schema.Type.STRING)) - .to(outputPath); + AvroIO.Write<GenericRecord> write = + AvroIO.writeGenericRecords(Schema.create(Schema.Type.STRING)).to(outputPath); Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); assertThat("AvroIO.Write should include the file pattern in its primitive transform",
