[BEAM-2677] AvroIO.parseGenericRecords - schemaless AvroIO.read
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ebd00411 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ebd00411 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ebd00411 Branch: refs/heads/master Commit: ebd004119c387787d0e0fcd0487e1b2754c7dbc5 Parents: 62c922b Author: Eugene Kirpichov <[email protected]> Authored: Mon Jul 24 15:07:15 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Jul 28 10:25:07 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroIO.java | 199 ++++++++++++++++++- .../java/org/apache/beam/sdk/io/AvroSource.java | 166 ++++++++++++---- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 89 ++++++--- .../org/apache/beam/sdk/io/AvroSourceTest.java | 30 ++- .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 4 +- 5 files changed, 406 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 018b84f..27c9073 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -35,7 +35,9 @@ import org.apache.avro.reflect.ReflectData; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; @@ -53,13 +55,16 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; /** * {@link PTransform}s for reading and writing Avro files. * - * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()}, using - * {@link AvroIO.Read#from} to specify the filename or filepattern to read from. Alternatively, if - * the filepatterns to be read are themselves in a {@link PCollection}, apply {@link #readAll}. + * <p>To read a {@link PCollection} from one or more Avro files with the same schema known at + * pipeline construction time, use {@code AvroIO.read()}, using {@link AvroIO.Read#from} to specify + * the filename or filepattern to read from. Alternatively, if the filepatterns to be read are + * themselves in a {@link PCollection}, apply {@link #readAll}. * * <p>See {@link FileSystems} for information on supported file systems and filepatterns. * @@ -70,6 +75,12 @@ import org.apache.beam.sdk.values.PDone; * schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link * #readAllGenericRecords}. * + * <p>To read records from files whose schema is unknown at pipeline construction time or differs + * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a + * parsing function for converting each {@link GenericRecord} into a value of your custom type. + * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link + * #parseAllGenericRecords}. + * * <p>For example: * * <pre>{@code @@ -84,12 +95,20 @@ import org.apache.beam.sdk.values.PDone; * PCollection<GenericRecord> records = * p.apply(AvroIO.readGenericRecords(schema) * .from("gs://my_bucket/path/to/records-*.avro")); + * + * PCollection<Foo> records = + * p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() { + * public Foo apply(GenericRecord record) { + * // If needed, access the schema of the record using record.getSchema() + * return ...; + * } + * })); * }</pre> * * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of - * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and - * scalability. Note that it may decrease performance if the filepattern matches only a small - * number of files. + * thousands or more), use {@link Read#withHintMatchesManyFiles} or {@link + * Parse#withHintMatchesManyFiles} for better performance and scalability. Note that it may decrease + * performance if the filepattern matches only a small number of files. * * <p>Reading from a {@link PCollection} of filepatterns: * @@ -101,6 +120,8 @@ import org.apache.beam.sdk.values.PDone; * filepatterns.apply(AvroIO.read(AvroAutoGenClass.class)); * PCollection<GenericRecord> genericRecords = * filepatterns.apply(AvroIO.readGenericRecords(schema)); + * PCollection<Foo> records = + * filepatterns.apply(AvroIO.parseAllGenericRecords(new SerializableFunction...); * }</pre> * * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using @@ -208,6 +229,29 @@ public class AvroIO { } /** + * Reads Avro file(s) containing records of an unspecified schema and converting each record to a + * custom type. + */ + public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) { + return new AutoValue_AvroIO_Parse.Builder<T>() + .setParseFn(parseFn) + .setHintMatchesManyFiles(false) + .build(); + } + + /** + * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each filepattern in the + * input {@link PCollection}. + */ + public static <T> ParseAll<T> parseAllGenericRecords( + SerializableFunction<GenericRecord, T> parseFn) { + return new AutoValue_AvroIO_ParseAll.Builder<T>() + .setParseFn(parseFn) + .setDesiredBundleSizeBytes(64 * 1024 * 1024L) + .build(); + } + + /** * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding * pattern). */ @@ -387,6 +431,149 @@ public class AvroIO { ///////////////////////////////////////////////////////////////////////////// + /** Implementation of {@link #parseGenericRecords}. */ + @AutoValue + public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> { + @Nullable abstract ValueProvider<String> getFilepattern(); + abstract SerializableFunction<GenericRecord, T> getParseFn(); + @Nullable abstract Coder<T> getCoder(); + abstract boolean getHintMatchesManyFiles(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setFilepattern(ValueProvider<String> filepattern); + abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn); + abstract Builder<T> setCoder(Coder<T> coder); + abstract Builder<T> setHintMatchesManyFiles(boolean hintMatchesManyFiles); + + abstract Parse<T> build(); + } + + /** Reads from the given filename or filepattern. */ + public Parse<T> from(String filepattern) { + return from(StaticValueProvider.of(filepattern)); + } + + /** Like {@link #from(String)}. */ + public Parse<T> from(ValueProvider<String> filepattern) { + return toBuilder().setFilepattern(filepattern).build(); + } + + /** Sets a coder for the result of the parse function. */ + public Parse<T> withCoder(Coder<T> coder) { + return toBuilder().setCoder(coder).build(); + } + + /** Like {@link Read#withHintMatchesManyFiles()}. */ + public Parse<T> withHintMatchesManyFiles() { + return toBuilder().setHintMatchesManyFiles(true).build(); + } + + @Override + public PCollection<T> expand(PBegin input) { + checkNotNull(getFilepattern(), "filepattern"); + Coder<T> coder = inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry()); + if (getHintMatchesManyFiles()) { + return input + .apply(Create.ofProvider(getFilepattern(), StringUtf8Coder.of())) + .apply(parseAllGenericRecords(getParseFn()).withCoder(getCoder())); + } + return input.apply( + org.apache.beam.sdk.io.Read.from( + AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder))); + } + + private static <T> Coder<T> inferCoder( + @Nullable Coder<T> explicitCoder, + SerializableFunction<GenericRecord, T> parseFn, + CoderRegistry coderRegistry) { + if (explicitCoder != null) { + return explicitCoder; + } + // If a coder was not specified explicitly, infer it from parse fn. + TypeDescriptor<T> descriptor = TypeDescriptors.outputOf(parseFn); + String message = + "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder()."; + checkArgument(descriptor != null, message); + try { + return coderRegistry.getCoder(descriptor); + } catch (CannotProvideCoderException e) { + throw new IllegalArgumentException(message, e); + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotNull( + DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern")) + .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function")); + } + } + + ///////////////////////////////////////////////////////////////////////////// + + /** Implementation of {@link #parseAllGenericRecords}. */ + @AutoValue + public abstract static class ParseAll<T> extends PTransform<PCollection<String>, PCollection<T>> { + abstract SerializableFunction<GenericRecord, T> getParseFn(); + @Nullable abstract Coder<T> getCoder(); + abstract long getDesiredBundleSizeBytes(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn); + abstract Builder<T> setCoder(Coder<T> coder); + abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes); + + abstract ParseAll<T> build(); + } + + /** Specifies the coder for the result of the {@code parseFn}. */ + public ParseAll<T> withCoder(Coder<T> coder) { + return toBuilder().setCoder(coder).build(); + } + + @VisibleForTesting + ParseAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) { + return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build(); + } + + @Override + public PCollection<T> expand(PCollection<String> input) { + final Coder<T> coder = + Parse.inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry()); + SerializableFunction<String, FileBasedSource<T>> createSource = + new SerializableFunction<String, FileBasedSource<T>>() { + @Override + public FileBasedSource<T> apply(String input) { + return AvroSource.from(input).withParseFn(getParseFn(), coder); + } + }; + return input + .apply( + "Parse all via FileBasedSource", + new ReadAllViaFileBasedSource<>( + SerializableFunctions.<String, Boolean>constant(true) /* isSplittable */, + getDesiredBundleSizeBytes(), + createSource)) + .setCoder(coder); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function")); + } + } + + ///////////////////////////////////////////////////////////////////////////// + /** Implementation of {@link #write}. */ @AutoValue public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> { http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java index a98d870..d277503 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -27,8 +28,10 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.InvalidObjectException; +import java.io.ObjectInputStream; import java.io.ObjectStreamException; import java.io.PushbackInputStream; +import java.io.Serializable; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; @@ -53,10 +56,12 @@ import org.apache.avro.reflect.ReflectDatumReader; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream; @@ -130,19 +135,84 @@ public class AvroSource<T> extends BlockBasedSource<T> { // The default sync interval is 64k. private static final long DEFAULT_MIN_BUNDLE_SIZE = 2 * DataFileConstants.DEFAULT_SYNC_INTERVAL; - // The type of the records contained in the file. - private final Class<T> type; + // Use cases of AvroSource are: + // 1) AvroSource<GenericRecord> Reading GenericRecord records with a specified schema. + // 2) AvroSource<Foo> Reading records of a generated Avro class Foo. + // 3) AvroSource<T> Reading GenericRecord records with an unspecified schema + // and converting them to type T. + // | Case 1 | Case 2 | Case 3 | + // type | GenericRecord | Foo | GenericRecord | + // readerSchemaString | non-null | non-null | null | + // parseFn | null | null | non-null | + // outputCoder | null | null | non-null | + private static class Mode<T> implements Serializable { + private final Class<?> type; + + // The JSON schema used to decode records. + @Nullable + private String readerSchemaString; + + @Nullable + private final SerializableFunction<GenericRecord, T> parseFn; + + @Nullable + private final Coder<T> outputCoder; + + private Mode( + Class<?> type, + @Nullable String readerSchemaString, + @Nullable SerializableFunction<GenericRecord, T> parseFn, + @Nullable Coder<T> outputCoder) { + this.type = type; + this.readerSchemaString = internSchemaString(readerSchemaString); + this.parseFn = parseFn; + this.outputCoder = outputCoder; + } - // The JSON schema used to decode records. - @Nullable - private final String readerSchemaString; + private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException { + is.defaultReadObject(); + readerSchemaString = internSchemaString(readerSchemaString); + } + + private Coder<T> getOutputCoder() { + if (parseFn == null) { + return AvroCoder.of((Class<T>) type, internOrParseSchemaString(readerSchemaString)); + } else { + return outputCoder; + } + } + + private void validate() { + if (parseFn == null) { + checkArgument( + readerSchemaString != null, + "schema must be specified using withSchema() when not using a parse fn"); + } + } + } + + private static Mode<GenericRecord> readGenericRecordsWithSchema(String schema) { + return new Mode<>(GenericRecord.class, schema, null, null); + } + private static <T> Mode<T> readGeneratedClasses(Class<T> clazz) { + return new Mode<>(clazz, ReflectData.get().getSchema(clazz).toString(), null, null); + } + private static <T> Mode<T> parseGenericRecords( + SerializableFunction<GenericRecord, T> parseFn, Coder<T> outputCoder) { + return new Mode<>(GenericRecord.class, null, parseFn, outputCoder); + } + + private final Mode<T> mode; /** - * Reads from the given file name or pattern ("glob"). The returned source can be further + * Reads from the given file name or pattern ("glob"). The returned source needs to be further * configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}. */ public static AvroSource<GenericRecord> from(ValueProvider<String> fileNameOrPattern) { - return new AvroSource<>(fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class); + return new AvroSource<>( + fileNameOrPattern, + DEFAULT_MIN_BUNDLE_SIZE, + readGenericRecordsWithSchema(null /* will need to be specified in withSchema */)); } /** Like {@link #from(ValueProvider)}. */ @@ -152,23 +222,40 @@ public class AvroSource<T> extends BlockBasedSource<T> { /** Reads files containing records that conform to the given schema. */ public AvroSource<GenericRecord> withSchema(String schema) { + checkNotNull(schema, "schema"); return new AvroSource<>( - getFileOrPatternSpecProvider(), getMinBundleSize(), schema, GenericRecord.class); + getFileOrPatternSpecProvider(), + getMinBundleSize(), + readGenericRecordsWithSchema(schema)); } /** Like {@link #withSchema(String)}. */ public AvroSource<GenericRecord> withSchema(Schema schema) { - return new AvroSource<>( - getFileOrPatternSpecProvider(), getMinBundleSize(), schema.toString(), GenericRecord.class); + checkNotNull(schema, "schema"); + return withSchema(schema.toString()); } /** Reads files containing records of the given class. */ public <X> AvroSource<X> withSchema(Class<X> clazz) { + checkNotNull(clazz, "clazz"); + return new AvroSource<>( + getFileOrPatternSpecProvider(), + getMinBundleSize(), + readGeneratedClasses(clazz)); + } + + /** + * Reads {@link GenericRecord} of unspecified schema and maps them to instances of a custom type + * using the given {@code parseFn} and encoded using the given coder. + */ + public <X> AvroSource<X> withParseFn( + SerializableFunction<GenericRecord, X> parseFn, Coder<X> coder) { + checkNotNull(parseFn, "parseFn"); + checkNotNull(parseFn, "coder"); return new AvroSource<>( getFileOrPatternSpecProvider(), getMinBundleSize(), - ReflectData.get().getSchema(clazz).toString(), - clazz); + parseGenericRecords(parseFn, coder)); } /** @@ -176,19 +263,16 @@ public class AvroSource<T> extends BlockBasedSource<T> { * minBundleSize} and its use. */ public AvroSource<T> withMinBundleSize(long minBundleSize) { - return new AvroSource<>( - getFileOrPatternSpecProvider(), minBundleSize, readerSchemaString, type); + return new AvroSource<>(getFileOrPatternSpecProvider(), minBundleSize, mode); } /** Constructor for FILEPATTERN mode. */ private AvroSource( ValueProvider<String> fileNameOrPattern, long minBundleSize, - String readerSchemaString, - Class<T> type) { + Mode<T> mode) { super(fileNameOrPattern, minBundleSize); - this.readerSchemaString = internSchemaString(readerSchemaString); - this.type = type; + this.mode = mode; } /** Constructor for SINGLE_FILE_OR_SUBRANGE mode. */ @@ -197,18 +281,15 @@ public class AvroSource<T> extends BlockBasedSource<T> { long minBundleSize, long startOffset, long endOffset, - String readerSchemaString, - Class<T> type) { + Mode<T> mode) { super(metadata, minBundleSize, startOffset, endOffset); - this.readerSchemaString = internSchemaString(readerSchemaString); - this.type = type; + this.mode = mode; } @Override public void validate() { - // AvroSource objects do not need to be configured with more than a file pattern. Overridden to - // make this explicit. super.validate(); + mode.validate(); } /** @@ -225,7 +306,7 @@ public class AvroSource<T> extends BlockBasedSource<T> { @Override public BlockBasedSource<T> createForSubrangeOfFile(Metadata fileMetadata, long start, long end) { - return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, readerSchemaString, type); + return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, mode); } @Override @@ -234,14 +315,14 @@ public class AvroSource<T> extends BlockBasedSource<T> { } @Override - public AvroCoder<T> getDefaultOutputCoder() { - return AvroCoder.of(type, internOrParseSchemaString(readerSchemaString)); + public Coder<T> getDefaultOutputCoder() { + return mode.getOutputCoder(); } @VisibleForTesting @Nullable String getReaderSchemaString() { - return readerSchemaString; + return mode.readerSchemaString; } /** Avro file metadata. */ @@ -380,15 +461,9 @@ public class AvroSource<T> extends BlockBasedSource<T> { switch (getMode()) { case SINGLE_FILE_OR_SUBRANGE: return new AvroSource<>( - getSingleFileMetadata(), - getMinBundleSize(), - getStartOffset(), - getEndOffset(), - readerSchemaString, - type); + getSingleFileMetadata(), getMinBundleSize(), getStartOffset(), getEndOffset(), mode); case FILEPATTERN: - return new AvroSource<>( - getFileOrPatternSpecProvider(), getMinBundleSize(), readerSchemaString, type); + return new AvroSource<>(getFileOrPatternSpecProvider(), getMinBundleSize(), mode); default: throw new InvalidObjectException( String.format("Unknown mode %s for AvroSource %s", getMode(), this)); @@ -402,6 +477,8 @@ public class AvroSource<T> extends BlockBasedSource<T> { */ @Experimental(Experimental.Kind.SOURCE_SINK) static class AvroBlock<T> extends Block<T> { + private final Mode<T> mode; + // The number of records in the block. private final long numRecords; @@ -412,7 +489,7 @@ public class AvroSource<T> extends BlockBasedSource<T> { private long currentRecordIndex = 0; // A DatumReader to read records from the block. - private final DatumReader<T> reader; + private final DatumReader<?> reader; // A BinaryDecoder used by the reader to decode records. private final BinaryDecoder decoder; @@ -455,19 +532,19 @@ public class AvroSource<T> extends BlockBasedSource<T> { AvroBlock( byte[] data, long numRecords, - Class<? extends T> type, - String readerSchemaString, + Mode<T> mode, String writerSchemaString, String codec) throws IOException { + this.mode = mode; this.numRecords = numRecords; checkNotNull(writerSchemaString, "writerSchemaString"); Schema writerSchema = internOrParseSchemaString(writerSchemaString); Schema readerSchema = internOrParseSchemaString( - MoreObjects.firstNonNull(readerSchemaString, writerSchemaString)); + MoreObjects.firstNonNull(mode.readerSchemaString, writerSchemaString)); this.reader = - (type == GenericRecord.class) + (mode.type == GenericRecord.class) ? new GenericDatumReader<T>(writerSchema, readerSchema) : new ReflectDatumReader<T>(writerSchema, readerSchema); this.decoder = DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, codec), null); @@ -483,7 +560,9 @@ public class AvroSource<T> extends BlockBasedSource<T> { if (currentRecordIndex >= numRecords) { return false; } - currentRecord = reader.read(null, decoder); + Object record = reader.read(null, decoder); + currentRecord = + (mode.parseFn == null) ? ((T) record) : mode.parseFn.apply((GenericRecord) record); currentRecordIndex++; return true; } @@ -585,8 +664,7 @@ public class AvroSource<T> extends BlockBasedSource<T> { new AvroBlock<>( data, numRecords, - getCurrentSource().type, - getCurrentSource().readerSchemaString, + getCurrentSource().mode, metadata.getSchemaString(), metadata.getCodec()); http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 90cd824..154ff5a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -67,6 +67,7 @@ import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.testing.UsesTestStream; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -114,9 +115,9 @@ public class AvroIOTest { public GenericClass() {} - public GenericClass(int intValue, String stringValue) { - this.intField = intValue; - this.stringField = stringValue; + public GenericClass(int intField, String stringField) { + this.intField = intField; + this.stringField = stringField; } @Override @@ -142,9 +143,18 @@ public class AvroIOTest { } } + private static class ParseGenericClass + implements SerializableFunction<GenericRecord, GenericClass> { + @Override + public GenericClass apply(GenericRecord input) { + return new GenericClass( + (int) input.get("intField"), input.get("stringField").toString()); + } + } + @Test @Category(NeedsRunner.class) - public void testAvroIOWriteAndReadASingleFile() throws Throwable { + public void testAvroIOWriteAndReadAndParseASingleFile() throws Throwable { List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); @@ -153,23 +163,45 @@ public class AvroIOTest { .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding()); writePipeline.run().waitUntilFinish(); - // Test the same data via read(), read().withHintMatchesManyFiles(), and readAll() + // Test the same data using all versions of read(). + PCollection<String> path = + readPipeline.apply("Create path", Create.of(outputFile.getAbsolutePath())); PAssert.that( - readPipeline.apply( - "Read", AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()))) + readPipeline.apply( + "Read", AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()))) .containsInAnyOrder(values); PAssert.that( - readPipeline.apply( - "Read withHintMatchesManyFiles", - AvroIO.read(GenericClass.class) - .from(outputFile.getAbsolutePath()) - .withHintMatchesManyFiles())) + readPipeline.apply( + "Read withHintMatchesManyFiles", + AvroIO.read(GenericClass.class) + .from(outputFile.getAbsolutePath()) + .withHintMatchesManyFiles())) .containsInAnyOrder(values); PAssert.that( - "ReadAll", - readPipeline - .apply(Create.of(outputFile.getAbsolutePath())) - .apply(AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10))) + path.apply( + "ReadAll", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10))) + .containsInAnyOrder(values); + PAssert.that( + readPipeline.apply( + "Parse", + AvroIO.parseGenericRecords(new ParseGenericClass()) + .from(outputFile.getAbsolutePath()) + .withCoder(AvroCoder.of(GenericClass.class)))) + .containsInAnyOrder(values); + PAssert.that( + readPipeline.apply( + "Parse withHintMatchesManyFiles", + AvroIO.parseGenericRecords(new ParseGenericClass()) + .from(outputFile.getAbsolutePath()) + .withCoder(AvroCoder.of(GenericClass.class)) + .withHintMatchesManyFiles())) + .containsInAnyOrder(values); + PAssert.that( + path.apply( + "ParseAll", + AvroIO.parseAllGenericRecords(new ParseGenericClass()) + .withCoder(AvroCoder.of(GenericClass.class)) + .withDesiredBundleSizeBytes(10))) .containsInAnyOrder(values); readPipeline.run(); @@ -200,7 +232,7 @@ public class AvroIOTest { .withNumShards(3)); writePipeline.run().waitUntilFinish(); - // Test both read() and readAll() + // Test read(), readAll(), and parseAllGenericRecords(). PAssert.that( readPipeline.apply( "Read first", @@ -213,15 +245,22 @@ public class AvroIOTest { AvroIO.read(GenericClass.class) .from(tmpFolder.getRoot().getAbsolutePath() + "/second*"))) .containsInAnyOrder(secondValues); + PCollection<String> paths = + readPipeline.apply( + "Create paths", + Create.of( + tmpFolder.getRoot().getAbsolutePath() + "/first*", + tmpFolder.getRoot().getAbsolutePath() + "/second*")); + PAssert.that( + paths.apply( + "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10))) + .containsInAnyOrder(Iterables.concat(firstValues, secondValues)); PAssert.that( - readPipeline - .apply( - "Create paths", - Create.of( - tmpFolder.getRoot().getAbsolutePath() + "/first*", - tmpFolder.getRoot().getAbsolutePath() + "/second*")) - .apply( - "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10))) + paths.apply( + "Parse all", + AvroIO.parseAllGenericRecords(new ParseGenericClass()) + .withCoder(AvroCoder.of(GenericClass.class)) + .withDesiredBundleSizeBytes(10))) .containsInAnyOrder(Iterables.concat(firstValues, secondValues)); readPipeline.run(); http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java index bf2ac95..714e029 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java @@ -59,6 +59,7 @@ import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.SourceTestUtils; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.SerializableUtils; import org.hamcrest.Matchers; @@ -407,11 +408,6 @@ public class AvroSourceTest { source = AvroSource.from(filename).withSchema(schemaString); records = SourceTestUtils.readFromSource(source, null); assertEqualsWithGeneric(expected, records); - - // Create a source with no schema - source = AvroSource.from(filename); - records = SourceTestUtils.readFromSource(source, null); - assertEqualsWithGeneric(expected, records); } @Test @@ -449,6 +445,30 @@ public class AvroSourceTest { assertSame(sourceA.getReaderSchemaString(), sourceC.getReaderSchemaString()); } + @Test + public void testParseFn() throws Exception { + List<Bird> expected = createRandomRecords(100); + String filename = generateTestFile("tmp.avro", expected, SyncBehavior.SYNC_DEFAULT, 0, + AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC); + + AvroSource<Bird> source = + AvroSource.from(filename) + .withParseFn( + new SerializableFunction<GenericRecord, Bird>() { + @Override + public Bird apply(GenericRecord input) { + return new Bird( + (long) input.get("number"), + input.get("species").toString(), + input.get("quality").toString(), + (long) input.get("quantity")); + } + }, + AvroCoder.of(Bird.class)); + List<Bird> actual = SourceTestUtils.readFromSource(source, null); + assertThat(actual, containsInAnyOrder(expected.toArray())); + } + private void assertEqualsWithGeneric(List<Bird> expected, List<GenericRecord> actual) { assertEquals(expected.size(), actual.size()); for (int i = 0; i < expected.size(); i++) { http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index 2b1eafe..6c118a0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -183,8 +183,8 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { List<BoundedSource<TableRow>> avroSources = Lists.newArrayList(); for (ResourceId file : files) { - avroSources.add(new TransformingSource<>( - AvroSource.from(file.toString()), function, getDefaultOutputCoder())); + avroSources.add( + AvroSource.from(file.toString()).withParseFn(function, getDefaultOutputCoder())); } return ImmutableList.copyOf(avroSources); }
