The work you've done to generalize and expand Schemas has significantly simplified what we need to do for SQL, I hope they are valuable to everyone else too. What work remains before we can drop the Experimental designation?
Andrew On Wed, Aug 29, 2018 at 5:31 AM Eugene Kirpichov <[email protected]> wrote: > Wow, this is really coming together, congratulations and thanks for the > great work! > > On Wed, Aug 29, 2018 at 1:40 AM Reuven Lax <[email protected]> wrote: > >> I wanted to send a quick note to the community about the current status >> of schema-aware PCollections in Beam. As some might remember we had a good >> discussion last year about the design of these schemas, involving many >> folks from different parts of the community. I sent a summary earlier this >> year explaining how schemas has been integrated into the DoFn framework. >> Much has happened since then, and here are some of the highlights. >> >> First, I want to emphasize that all the schema-aware classes are >> currently marked @Experimental. Nothing is set in stone yet, so if you have >> questions about any decisions made, please start a discussion! >> >> SQL >> >> The first big milestone for schemas was porting all of BeamSQL to use the >> framework, which was done in pr/5956. This was a lot of work, exposed many >> bugs in the schema implementation, but now provides great evidence that >> schemas work! >> >> Schema inference >> >> Beam can automatically infer schemas from Java POJOs (objects with public >> fields) or JavaBean objects (objects with getter/setter methods). Often you >> can do this by simply annotating the class. For example: >> >> @DefaultSchema(JavaFieldSchema.class) >> >> public class UserEvent { >> >> public String userId; >> >> public LatLong location; >> >> Public String countryCode; >> >> public long transactionCost; >> >> public double transactionDuration; >> >> public List<String> traceMessages; >> >> }; >> >> @DefaultSchema(JavaFieldSchema.class) >> >> public class LatLong { >> >> public double latitude; >> >> public double longitude; >> >> } >> >> Beam will automatically infer schemas for these classes! So if you have a >> PCollection<UserEvent>, it will automatically get the following schema: >> >> UserEvent: >> >> userId: STRING >> >> location: ROW(LatLong) >> >> countryCode: STRING >> >> transactionCost: INT64 >> >> transactionDuration: DOUBLE >> >> traceMessages: ARRAY[STRING]] >> >> >> LatLong: >> >> latitude: DOUBLE >> >> longitude: DOUBLE >> >> Now it’s not always possible to annotate the class like this (you may not >> own the class definition), so you can also explicitly register this using >> Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans. >> >> Coders >> >> Beam has a built-in coder for any schema-aware PCollection, largely >> removing the need for users to care about coders. We generate low-level >> bytecode (using ByteBuddy) to implement the coder for each schema, so these >> coders are quite performant. This provides a better default coder for Java >> POJO objects as well. In the past users were recommended to use AvroCoder >> for pojos, which many have found inefficient. Now there’s a more-efficient >> solution. >> >> Utility Transforms >> >> Schemas are already useful for implementers of extensions such as SQL, >> but the goal was to use them to make Beam itself easier to use. To this >> end, I’ve been implementing a library of transforms that allow for easy >> manipulation of schema PCollections. So far Filter and Select are merged, >> Group is about to go out for review (it needs some more javadoc and unit >> tests), and Join is being developed but doesn’t yet have a final interface. >> >> Filter >> >> Given a PCollection<LatLong>, I want to keep only those in an area of >> southern manhattan. Well this is easy! >> >> PCollection<LatLong> manhattanEvents = allEvents.apply(Filter >> >> .whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699) >> >> .whereFieldName("longitude", long -> long < -73.969 && long > -74.747)); >> >> Schemas along with lambdas allows us to write this transform >> declaratively. The Filter transform also allows you to register filter >> functions that operate on multiple fields at the same time. >> >> Select >> >> Let’s say that I don’t need all the fields in a row. For instance, I’m >> only interested in the userId and traceMessages, and don’t care about the >> location. In that case I can write the following: >> >> PCollection<Row> selected = allEvents.apply(Select.fieldNames(“userId”, “ >> traceMessages”)); >> >> >> BTW, Beam also keeps track of which fields are accessed by a transform In >> the future we can automatically insert Selects in front of subgraphs to >> drop fields that are not referenced in that subgraph. >> >> Group >> >> Group is one of the more advanced transforms. In its most basic form, it >> provides a convenient way to group by key: >> >> PCollection<KV<Row, Iterable<UserEvent>> byUserAndCountry = >> >> allEvents.apply(Group.byFieldNames(“userId”, “countryCode”)); >> >> Notice how much more concise this is than using GroupByKey directly! >> >> The Group transform really starts to shine however when you start >> specifying aggregations. You can aggregate any field (or fields) and build >> up an output schema based on these aggregations. For example: >> >> PCollection<KV<Row, Row>> aggregated = allEvents.apply( >> >> Group.byFieldNames(“userId”, “countryCode”) >> >> .aggregateField("cost", Sum.ofLongs(), "total_cost") >> >> .aggregateField("cost", Top.<Long>largestFn(10), “top_purchases”) >> >> .aggregateField("transationDuration", >> ApproximateQuantilesCombineFn.create(21), >> >> “durationHistogram”))); >> >> This will individually aggregate the specified fields of the input items >> (by user and country), and generate an output schema for these >> aggregations. In this case, the output schema will be the following: >> >> AggregatedSchema: >> >> total_cost: INT64 >> >> top_purchases: ARRAY[INT64] >> >> durationHistogram: ARRAY[DOUBLE] >> >> There are some more utility transforms I've written that are worth >> looking at such as Convert (which can convert between user types that share >> a schema) and Unnest (flattens nested schemas). There are also some others >> such as Pivot that we should consider writing >> >> There is still a lot to do. All the todo items are reflected in JIRA, >> however here are some examples of current gaps: >> >> >> - >> >> Support for read-only POJOs (those with final fields) and JavaBean >> (objects without setters). >> - >> >> Automatic schema inference from more Java types: protocol buffers, >> avro, AutoValue, etc. >> - >> >> Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.) >> - >> >> Support for JsonPath expressions so users can better express nested >> fields. E.g. support expressions of the form >> Select.fields(“field1.field2”, >> “field3.*”, “field4[0].field5”); >> - >> >> Schemas still need to be defined in our portability layer so they can >> be used cross language. >> >> >> If anyone is interested in helping close these gaps, you'll be helping >> make Beam a better, more-usable system! >> >> Reuven >> >>
