Adds ValueProvider support to AvroIO.Read
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e80c83b2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e80c83b2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e80c83b2 Branch: refs/heads/master Commit: e80c83b2a0a1cf55aa8a452a02a76c9dc13697cc Parents: 71196ec Author: Eugene Kirpichov <[email protected]> Authored: Fri Jul 21 12:38:17 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Jul 25 17:36:49 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroIO.java | 49 ++++++++------------ .../java/org/apache/beam/sdk/io/AvroSource.java | 24 +++++++--- .../apache/beam/sdk/io/BlockBasedSource.java | 6 +++ 3 files changed, 42 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e80c83b2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 89cadbd..d308c85 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableMap; @@ -36,7 +37,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; -import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; @@ -185,10 +185,10 @@ public class AvroIO { .setWindowedWrites(false); } - /** Implementation of {@link #read}. */ + /** Implementation of {@link #read} and {@link #readGenericRecords}. */ @AutoValue public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> { - @Nullable abstract String getFilepattern(); + @Nullable abstract ValueProvider<String> getFilepattern(); @Nullable abstract Class<T> getRecordClass(); @Nullable abstract Schema getSchema(); @@ -196,7 +196,7 @@ public class AvroIO { @AutoValue.Builder abstract static class Builder<T> { - abstract Builder<T> setFilepattern(String filepattern); + abstract Builder<T> setFilepattern(ValueProvider<String> filepattern); abstract Builder<T> setRecordClass(Class<T> recordClass); abstract Builder<T> setSchema(Schema schema); @@ -204,45 +204,34 @@ public class AvroIO { } /** Reads from the given filename or filepattern. */ - public Read<T> from(String filepattern) { + public Read<T> from(ValueProvider<String> filepattern) { return toBuilder().setFilepattern(filepattern).build(); } + /** Like {@link #from(ValueProvider)}. */ + public Read<T> from(String filepattern) { + return from(StaticValueProvider.of(filepattern)); + } + @Override public PCollection<T> expand(PBegin input) { - if (getFilepattern() == null) { - throw new IllegalStateException( - "need to set the filepattern of an AvroIO.Read transform"); - } - if (getSchema() == null) { - throw new IllegalStateException("need to set the schema of an AvroIO.Read transform"); - } + checkNotNull(getFilepattern(), "filepattern"); + checkNotNull(getSchema(), "schema"); @SuppressWarnings("unchecked") - Bounded<T> read = + AvroSource<T> source = getRecordClass() == GenericRecord.class - ? (Bounded<T>) org.apache.beam.sdk.io.Read.from( - AvroSource.from(getFilepattern()).withSchema(getSchema())) - : org.apache.beam.sdk.io.Read.from( - AvroSource.from(getFilepattern()).withSchema(getRecordClass())); - - PCollection<T> pcol = input.getPipeline().apply("Read", read); - // Honor the default output coder that would have been used by this PTransform. - pcol.setCoder(getDefaultOutputCoder()); - return pcol; + ? (AvroSource<T>) AvroSource.from(getFilepattern()).withSchema(getSchema()) + : AvroSource.from(getFilepattern()).withSchema(getRecordClass()); + + return input.getPipeline().apply("Read", org.apache.beam.sdk.io.Read.from(source)); } @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("filePattern", getFilepattern()) - .withLabel("Input File Pattern")); - } - - @Override - protected Coder<T> getDefaultOutputCoder() { - return AvroCoder.of(getRecordClass(), getSchema()); + builder.addIfNotNull( + DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern")); } } http://git-wip-us.apache.org/repos/asf/beam/blob/e80c83b2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java index 7cd97a8..a98d870 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream; @@ -140,26 +141,31 @@ public class AvroSource<T> extends BlockBasedSource<T> { * Reads from the given file name or pattern ("glob"). The returned source can be further * configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}. */ - public static AvroSource<GenericRecord> from(String fileNameOrPattern) { + public static AvroSource<GenericRecord> from(ValueProvider<String> fileNameOrPattern) { return new AvroSource<>(fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class); } + /** Like {@link #from(ValueProvider)}. */ + public static AvroSource<GenericRecord> from(String fileNameOrPattern) { + return from(ValueProvider.StaticValueProvider.of(fileNameOrPattern)); + } + /** Reads files containing records that conform to the given schema. */ public AvroSource<GenericRecord> withSchema(String schema) { return new AvroSource<>( - getFileOrPatternSpec(), getMinBundleSize(), schema, GenericRecord.class); + getFileOrPatternSpecProvider(), getMinBundleSize(), schema, GenericRecord.class); } /** Like {@link #withSchema(String)}. */ public AvroSource<GenericRecord> withSchema(Schema schema) { return new AvroSource<>( - getFileOrPatternSpec(), getMinBundleSize(), schema.toString(), GenericRecord.class); + getFileOrPatternSpecProvider(), getMinBundleSize(), schema.toString(), GenericRecord.class); } /** Reads files containing records of the given class. */ public <X> AvroSource<X> withSchema(Class<X> clazz) { return new AvroSource<>( - getFileOrPatternSpec(), + getFileOrPatternSpecProvider(), getMinBundleSize(), ReflectData.get().getSchema(clazz).toString(), clazz); @@ -170,12 +176,16 @@ public class AvroSource<T> extends BlockBasedSource<T> { * minBundleSize} and its use. */ public AvroSource<T> withMinBundleSize(long minBundleSize) { - return new AvroSource<>(getFileOrPatternSpec(), minBundleSize, readerSchemaString, type); + return new AvroSource<>( + getFileOrPatternSpecProvider(), minBundleSize, readerSchemaString, type); } /** Constructor for FILEPATTERN mode. */ private AvroSource( - String fileNameOrPattern, long minBundleSize, String readerSchemaString, Class<T> type) { + ValueProvider<String> fileNameOrPattern, + long minBundleSize, + String readerSchemaString, + Class<T> type) { super(fileNameOrPattern, minBundleSize); this.readerSchemaString = internSchemaString(readerSchemaString); this.type = type; @@ -378,7 +388,7 @@ public class AvroSource<T> extends BlockBasedSource<T> { type); case FILEPATTERN: return new AvroSource<>( - getFileOrPatternSpec(), getMinBundleSize(), readerSchemaString, type); + getFileOrPatternSpecProvider(), getMinBundleSize(), readerSchemaString, type); default: throw new InvalidObjectException( String.format("Unknown mode %s for AvroSource %s", getMode(), this)); http://git-wip-us.apache.org/repos/asf/beam/blob/e80c83b2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java index cf6671e..25e8483 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java @@ -23,6 +23,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; /** @@ -69,6 +70,11 @@ public abstract class BlockBasedSource<T> extends FileBasedSource<T> { super(StaticValueProvider.of(fileOrPatternSpec), minBundleSize); } + /** Like {@link #BlockBasedSource(String, long)}. */ + public BlockBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) { + super(fileOrPatternSpec, minBundleSize); + } + /** * Creates a {@code BlockBasedSource} for a single file. Subclasses must call this constructor * when implementing {@link BlockBasedSource#createForSubrangeOfFile}. See documentation in
