Thanks Reuven for the excellent summary and thanks to all the guys who
worked in the Schema/SQL improvements. This is great for usability. I
really like the idea of making user experience simpler, e.g. by
automatically inferring Coders. Some questions:

- Any plans to add similar improvements for the python/go SDKs ?

- I suppose that deconstructing the element objects based on
annotations happens in flight, have you thought about a way to
eventually push this into the previous transform (e.g. the IO) via
some sort of push-down predicate ? (I suppose this applies for
jsonpath filters but should be more complex)

- Do you think it makes sense to have ways for IOs to provide
transforms to convert from/to Rows. I remember there was some work on
this for the SQL. I am wondering how we can make the Schema/SQL
experience even more friendly.

- What is the current intermediate representation, if I remember well
weren’t Coders in part a way of hacking around the issues of
determinism in Java Serialization. So if we use Java serialization
(generated via bytebuddy) wouldn’t we have similar issues?

- Have you envisioned other ways to serialize apart of the generation
via bytebuddy ? e.g. to make Row compatible with formats supported in
multiple languages e.g. protobuf, or some arrow-like thing that we can
just submit without reserialization and can be decoded back (this will
be great for portability).

- Any discussions on row ↔ json conversion? Sounds like a trivial /
common case too

Regards,
Ismael
On Thu, Aug 30, 2018 at 4:51 PM Reuven Lax <[email protected]> wrote:
>
> Andrew - the @Experimental tag simply means that we are free to change the 
> interfaces without waiting for the next major Beam version. Once we are happy 
> to freeze these interfaces, we can drop the tag.
>
> On Wed, Aug 29, 2018 at 1:48 PM Andrew Pilloud <[email protected]> wrote:
>>
>> The work you've done to generalize and expand Schemas has significantly 
>> simplified what we need to do for SQL, I hope they are valuable to everyone 
>> else too. What work remains before we can drop the Experimental designation?
>>
>> Andrew
>>
>> On Wed, Aug 29, 2018 at 5:31 AM Eugene Kirpichov <[email protected]> 
>> wrote:
>>>
>>> Wow, this is really coming together, congratulations and thanks for the 
>>> great work!
>>>
>>> On Wed, Aug 29, 2018 at 1:40 AM Reuven Lax <[email protected]> wrote:
>>>>
>>>> I wanted to send a quick note to the community about the current status of 
>>>> schema-aware PCollections in Beam. As some might remember we had a good 
>>>> discussion last year about the design of these schemas, involving many 
>>>> folks from different parts of the community. I sent a summary earlier this 
>>>> year explaining how schemas has been integrated into the DoFn framework. 
>>>> Much has happened since then, and here are some of the highlights.
>>>>
>>>>
>>>> First, I want to emphasize that all the schema-aware classes are currently 
>>>> marked @Experimental. Nothing is set in stone yet, so if you have 
>>>> questions about any decisions made, please start a discussion!
>>>>
>>>>
>>>> SQL
>>>>
>>>> The first big milestone for schemas was porting all of BeamSQL to use the 
>>>> framework, which was done in pr/5956. This was a lot of work, exposed many 
>>>> bugs in the schema implementation, but now provides great evidence that 
>>>> schemas work!
>>>>
>>>>
>>>> Schema inference
>>>>
>>>> Beam can automatically infer schemas from Java POJOs (objects with public 
>>>> fields) or JavaBean objects (objects with getter/setter methods). Often 
>>>> you can do this by simply annotating the class. For example:
>>>>
>>>>
>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>
>>>> public class UserEvent {
>>>>
>>>>  public String userId;
>>>>
>>>>  public LatLong location;
>>>>
>>>>  Public String countryCode;
>>>>
>>>>  public long transactionCost;
>>>>
>>>>  public double transactionDuration;
>>>>
>>>>  public List<String> traceMessages;
>>>>
>>>> };
>>>>
>>>>
>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>
>>>> public class LatLong {
>>>>
>>>>  public double latitude;
>>>>
>>>>  public double longitude;
>>>>
>>>> }
>>>>
>>>>
>>>> Beam will automatically infer schemas for these classes! So if you have a 
>>>> PCollection<UserEvent>, it will automatically get the following schema:
>>>>
>>>>
>>>> UserEvent:
>>>>
>>>>  userId: STRING
>>>>
>>>>  location: ROW(LatLong)
>>>>
>>>>  countryCode: STRING
>>>>
>>>>  transactionCost: INT64
>>>>
>>>>  transactionDuration: DOUBLE
>>>>
>>>>  traceMessages: ARRAY[STRING]]
>>>>
>>>>
>>>> LatLong:
>>>>
>>>>  latitude: DOUBLE
>>>>
>>>>  longitude: DOUBLE
>>>>
>>>>
>>>> Now it’s not always possible to annotate the class like this (you may not 
>>>> own the class definition), so you can also explicitly register this using 
>>>> Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans.
>>>>
>>>>
>>>> Coders
>>>>
>>>> Beam has a built-in coder for any schema-aware PCollection, largely 
>>>> removing the need for users to care about coders. We generate low-level 
>>>> bytecode (using ByteBuddy) to implement the coder for each schema, so 
>>>> these coders are quite performant. This provides a better default coder 
>>>> for Java POJO objects as well. In the past users were recommended to use 
>>>> AvroCoder for pojos, which many have found inefficient. Now there’s a 
>>>> more-efficient solution.
>>>>
>>>>
>>>> Utility Transforms
>>>>
>>>> Schemas are already useful for implementers of extensions such as SQL, but 
>>>> the goal was to use them to make Beam itself easier to use. To this end, 
>>>> I’ve been implementing a library of transforms that allow for easy 
>>>> manipulation of schema PCollections. So far Filter and Select are merged, 
>>>> Group is about to go out for review (it needs some more javadoc and unit 
>>>> tests), and Join is being developed but doesn’t yet have a final interface.
>>>>
>>>>
>>>> Filter
>>>>
>>>> Given a PCollection<LatLong>, I want to keep only those in an area of 
>>>> southern manhattan. Well this is easy!
>>>>
>>>>
>>>> PCollection<LatLong> manhattanEvents = allEvents.apply(Filter
>>>>
>>>>  .whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
>>>>
>>>>  .whereFieldName("longitude", long -> long < -73.969 && long > -74.747));
>>>>
>>>>
>>>> Schemas along with lambdas allows us to write this transform 
>>>> declaratively. The Filter transform also allows you to register filter 
>>>> functions that operate on multiple fields at the same time.
>>>>
>>>>
>>>> Select
>>>>
>>>> Let’s say that I don’t need all the fields in a row. For instance, I’m 
>>>> only interested in the userId and traceMessages, and don’t care about the 
>>>> location. In that case I can write the following:
>>>>
>>>>
>>>> PCollection<Row> selected = allEvents.apply(Select.fieldNames(“userId”, 
>>>> “traceMessages”));
>>>>
>>>>
>>>> BTW, Beam also keeps track of which fields are accessed by a transform In 
>>>> the future we can automatically insert Selects in front of subgraphs to 
>>>> drop fields that are not referenced in that subgraph.
>>>>
>>>>
>>>> Group
>>>>
>>>> Group is one of the more advanced transforms. In its most basic form, it 
>>>> provides a convenient way to group by key:
>>>>
>>>>
>>>> PCollection<KV<Row, Iterable<UserEvent>> byUserAndCountry =
>>>>
>>>>    allEvents.apply(Group.byFieldNames(“userId”, “countryCode”));
>>>>
>>>>
>>>> Notice how much more concise this is than using GroupByKey directly!
>>>>
>>>>
>>>> The Group transform really starts to shine however when you start 
>>>> specifying aggregations. You can aggregate any field (or fields) and build 
>>>> up an output schema based on these aggregations. For example:
>>>>
>>>>
>>>> PCollection<KV<Row, Row>> aggregated = allEvents.apply(
>>>>
>>>>    Group.byFieldNames(“userId”, “countryCode”)
>>>>
>>>>        .aggregateField("cost", Sum.ofLongs(), "total_cost")
>>>>
>>>>        .aggregateField("cost", Top.<Long>largestFn(10), “top_purchases”)
>>>>
>>>>        .aggregateField("transationDuration", 
>>>> ApproximateQuantilesCombineFn.create(21),
>>>>
>>>>              “durationHistogram”)));
>>>>
>>>>
>>>> This will individually aggregate the specified fields of the input items 
>>>> (by user and country), and generate an output schema for these 
>>>> aggregations. In this case, the output schema will be the following:
>>>>
>>>>
>>>> AggregatedSchema:
>>>>
>>>>    total_cost: INT64
>>>>
>>>>    top_purchases: ARRAY[INT64]
>>>>
>>>>    durationHistogram: ARRAY[DOUBLE]
>>>>
>>>>
>>>> There are some more utility transforms I've written that are worth looking 
>>>> at such as Convert (which can convert between user types that share a 
>>>> schema) and Unnest (flattens nested schemas). There are also some others 
>>>> such as Pivot that we should consider writing
>>>>
>>>>
>>>> There is still a lot to do. All the todo items are reflected in JIRA, 
>>>> however here are some examples of current gaps:
>>>>
>>>>
>>>> Support for read-only POJOs (those with final fields) and JavaBean 
>>>> (objects without setters).
>>>>
>>>> Automatic schema inference from more Java types: protocol buffers, avro, 
>>>> AutoValue, etc.
>>>>
>>>> Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
>>>>
>>>> Support for JsonPath expressions so users can better express nested 
>>>> fields. E.g. support expressions of the form 
>>>> Select.fields(“field1.field2”, “field3.*”, “field4[0].field5”);
>>>>
>>>> Schemas still need to be defined in our portability layer so they can be 
>>>> used cross language.
>>>>
>>>>
>>>> If anyone is interested in helping close these gaps, you'll be helping 
>>>> make Beam a better, more-usable system!
>>>>
>>>> Reuven
>>>>

Reply via email to