Thanks Reuven for the excellent summary and thanks to all the guys who worked in the Schema/SQL improvements. This is great for usability. I really like the idea of making user experience simpler, e.g. by automatically inferring Coders. Some questions:
- Any plans to add similar improvements for the python/go SDKs ? - I suppose that deconstructing the element objects based on annotations happens in flight, have you thought about a way to eventually push this into the previous transform (e.g. the IO) via some sort of push-down predicate ? (I suppose this applies for jsonpath filters but should be more complex) - Do you think it makes sense to have ways for IOs to provide transforms to convert from/to Rows. I remember there was some work on this for the SQL. I am wondering how we can make the Schema/SQL experience even more friendly. - What is the current intermediate representation, if I remember well weren’t Coders in part a way of hacking around the issues of determinism in Java Serialization. So if we use Java serialization (generated via bytebuddy) wouldn’t we have similar issues? - Have you envisioned other ways to serialize apart of the generation via bytebuddy ? e.g. to make Row compatible with formats supported in multiple languages e.g. protobuf, or some arrow-like thing that we can just submit without reserialization and can be decoded back (this will be great for portability). - Any discussions on row ↔ json conversion? Sounds like a trivial / common case too Regards, Ismael On Thu, Aug 30, 2018 at 4:51 PM Reuven Lax <[email protected]> wrote: > > Andrew - the @Experimental tag simply means that we are free to change the > interfaces without waiting for the next major Beam version. Once we are happy > to freeze these interfaces, we can drop the tag. > > On Wed, Aug 29, 2018 at 1:48 PM Andrew Pilloud <[email protected]> wrote: >> >> The work you've done to generalize and expand Schemas has significantly >> simplified what we need to do for SQL, I hope they are valuable to everyone >> else too. What work remains before we can drop the Experimental designation? >> >> Andrew >> >> On Wed, Aug 29, 2018 at 5:31 AM Eugene Kirpichov <[email protected]> >> wrote: >>> >>> Wow, this is really coming together, congratulations and thanks for the >>> great work! >>> >>> On Wed, Aug 29, 2018 at 1:40 AM Reuven Lax <[email protected]> wrote: >>>> >>>> I wanted to send a quick note to the community about the current status of >>>> schema-aware PCollections in Beam. As some might remember we had a good >>>> discussion last year about the design of these schemas, involving many >>>> folks from different parts of the community. I sent a summary earlier this >>>> year explaining how schemas has been integrated into the DoFn framework. >>>> Much has happened since then, and here are some of the highlights. >>>> >>>> >>>> First, I want to emphasize that all the schema-aware classes are currently >>>> marked @Experimental. Nothing is set in stone yet, so if you have >>>> questions about any decisions made, please start a discussion! >>>> >>>> >>>> SQL >>>> >>>> The first big milestone for schemas was porting all of BeamSQL to use the >>>> framework, which was done in pr/5956. This was a lot of work, exposed many >>>> bugs in the schema implementation, but now provides great evidence that >>>> schemas work! >>>> >>>> >>>> Schema inference >>>> >>>> Beam can automatically infer schemas from Java POJOs (objects with public >>>> fields) or JavaBean objects (objects with getter/setter methods). Often >>>> you can do this by simply annotating the class. For example: >>>> >>>> >>>> @DefaultSchema(JavaFieldSchema.class) >>>> >>>> public class UserEvent { >>>> >>>> public String userId; >>>> >>>> public LatLong location; >>>> >>>> Public String countryCode; >>>> >>>> public long transactionCost; >>>> >>>> public double transactionDuration; >>>> >>>> public List<String> traceMessages; >>>> >>>> }; >>>> >>>> >>>> @DefaultSchema(JavaFieldSchema.class) >>>> >>>> public class LatLong { >>>> >>>> public double latitude; >>>> >>>> public double longitude; >>>> >>>> } >>>> >>>> >>>> Beam will automatically infer schemas for these classes! So if you have a >>>> PCollection<UserEvent>, it will automatically get the following schema: >>>> >>>> >>>> UserEvent: >>>> >>>> userId: STRING >>>> >>>> location: ROW(LatLong) >>>> >>>> countryCode: STRING >>>> >>>> transactionCost: INT64 >>>> >>>> transactionDuration: DOUBLE >>>> >>>> traceMessages: ARRAY[STRING]] >>>> >>>> >>>> LatLong: >>>> >>>> latitude: DOUBLE >>>> >>>> longitude: DOUBLE >>>> >>>> >>>> Now it’s not always possible to annotate the class like this (you may not >>>> own the class definition), so you can also explicitly register this using >>>> Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans. >>>> >>>> >>>> Coders >>>> >>>> Beam has a built-in coder for any schema-aware PCollection, largely >>>> removing the need for users to care about coders. We generate low-level >>>> bytecode (using ByteBuddy) to implement the coder for each schema, so >>>> these coders are quite performant. This provides a better default coder >>>> for Java POJO objects as well. In the past users were recommended to use >>>> AvroCoder for pojos, which many have found inefficient. Now there’s a >>>> more-efficient solution. >>>> >>>> >>>> Utility Transforms >>>> >>>> Schemas are already useful for implementers of extensions such as SQL, but >>>> the goal was to use them to make Beam itself easier to use. To this end, >>>> I’ve been implementing a library of transforms that allow for easy >>>> manipulation of schema PCollections. So far Filter and Select are merged, >>>> Group is about to go out for review (it needs some more javadoc and unit >>>> tests), and Join is being developed but doesn’t yet have a final interface. >>>> >>>> >>>> Filter >>>> >>>> Given a PCollection<LatLong>, I want to keep only those in an area of >>>> southern manhattan. Well this is easy! >>>> >>>> >>>> PCollection<LatLong> manhattanEvents = allEvents.apply(Filter >>>> >>>> .whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699) >>>> >>>> .whereFieldName("longitude", long -> long < -73.969 && long > -74.747)); >>>> >>>> >>>> Schemas along with lambdas allows us to write this transform >>>> declaratively. The Filter transform also allows you to register filter >>>> functions that operate on multiple fields at the same time. >>>> >>>> >>>> Select >>>> >>>> Let’s say that I don’t need all the fields in a row. For instance, I’m >>>> only interested in the userId and traceMessages, and don’t care about the >>>> location. In that case I can write the following: >>>> >>>> >>>> PCollection<Row> selected = allEvents.apply(Select.fieldNames(“userId”, >>>> “traceMessages”)); >>>> >>>> >>>> BTW, Beam also keeps track of which fields are accessed by a transform In >>>> the future we can automatically insert Selects in front of subgraphs to >>>> drop fields that are not referenced in that subgraph. >>>> >>>> >>>> Group >>>> >>>> Group is one of the more advanced transforms. In its most basic form, it >>>> provides a convenient way to group by key: >>>> >>>> >>>> PCollection<KV<Row, Iterable<UserEvent>> byUserAndCountry = >>>> >>>> allEvents.apply(Group.byFieldNames(“userId”, “countryCode”)); >>>> >>>> >>>> Notice how much more concise this is than using GroupByKey directly! >>>> >>>> >>>> The Group transform really starts to shine however when you start >>>> specifying aggregations. You can aggregate any field (or fields) and build >>>> up an output schema based on these aggregations. For example: >>>> >>>> >>>> PCollection<KV<Row, Row>> aggregated = allEvents.apply( >>>> >>>> Group.byFieldNames(“userId”, “countryCode”) >>>> >>>> .aggregateField("cost", Sum.ofLongs(), "total_cost") >>>> >>>> .aggregateField("cost", Top.<Long>largestFn(10), “top_purchases”) >>>> >>>> .aggregateField("transationDuration", >>>> ApproximateQuantilesCombineFn.create(21), >>>> >>>> “durationHistogram”))); >>>> >>>> >>>> This will individually aggregate the specified fields of the input items >>>> (by user and country), and generate an output schema for these >>>> aggregations. In this case, the output schema will be the following: >>>> >>>> >>>> AggregatedSchema: >>>> >>>> total_cost: INT64 >>>> >>>> top_purchases: ARRAY[INT64] >>>> >>>> durationHistogram: ARRAY[DOUBLE] >>>> >>>> >>>> There are some more utility transforms I've written that are worth looking >>>> at such as Convert (which can convert between user types that share a >>>> schema) and Unnest (flattens nested schemas). There are also some others >>>> such as Pivot that we should consider writing >>>> >>>> >>>> There is still a lot to do. All the todo items are reflected in JIRA, >>>> however here are some examples of current gaps: >>>> >>>> >>>> Support for read-only POJOs (those with final fields) and JavaBean >>>> (objects without setters). >>>> >>>> Automatic schema inference from more Java types: protocol buffers, avro, >>>> AutoValue, etc. >>>> >>>> Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.) >>>> >>>> Support for JsonPath expressions so users can better express nested >>>> fields. E.g. support expressions of the form >>>> Select.fields(“field1.field2”, “field3.*”, “field4[0].field5”); >>>> >>>> Schemas still need to be defined in our portability layer so they can be >>>> used cross language. >>>> >>>> >>>> If anyone is interested in helping close these gaps, you'll be helping >>>> make Beam a better, more-usable system! >>>> >>>> Reuven >>>>
