The work you've done to generalize and expand Schemas has significantly
simplified what we need to do for SQL, I hope they are valuable to everyone
else too. What work remains before we can drop the Experimental designation?

Andrew

On Wed, Aug 29, 2018 at 5:31 AM Eugene Kirpichov <[email protected]>
wrote:

> Wow, this is really coming together, congratulations and thanks for the
> great work!
>
> On Wed, Aug 29, 2018 at 1:40 AM Reuven Lax <[email protected]> wrote:
>
>> I wanted to send a quick note to the community about the current status
>> of schema-aware PCollections in Beam. As some might remember we had a good
>> discussion last year about the design of these schemas, involving many
>> folks from different parts of the community. I sent a summary earlier this
>> year explaining how schemas has been integrated into the DoFn framework.
>> Much has happened since then, and here are some of the highlights.
>>
>> First, I want to emphasize that all the schema-aware classes are
>> currently marked @Experimental. Nothing is set in stone yet, so if you have
>> questions about any decisions made, please start a discussion!
>>
>> SQL
>>
>> The first big milestone for schemas was porting all of BeamSQL to use the
>> framework, which was done in pr/5956. This was a lot of work, exposed many
>> bugs in the schema implementation, but now provides great evidence that
>> schemas work!
>>
>> Schema inference
>>
>> Beam can automatically infer schemas from Java POJOs (objects with public
>> fields) or JavaBean objects (objects with getter/setter methods). Often you
>> can do this by simply annotating the class. For example:
>>
>> @DefaultSchema(JavaFieldSchema.class)
>>
>> public class UserEvent {
>>
>>  public String userId;
>>
>>  public LatLong location;
>>
>>  Public String countryCode;
>>
>>  public long transactionCost;
>>
>>  public double transactionDuration;
>>
>>  public List<String> traceMessages;
>>
>> };
>>
>> @DefaultSchema(JavaFieldSchema.class)
>>
>> public class LatLong {
>>
>>  public double latitude;
>>
>>  public double longitude;
>>
>> }
>>
>> Beam will automatically infer schemas for these classes! So if you have a
>> PCollection<UserEvent>, it will automatically get the following schema:
>>
>> UserEvent:
>>
>>  userId: STRING
>>
>>  location: ROW(LatLong)
>>
>>  countryCode: STRING
>>
>>  transactionCost: INT64
>>
>>  transactionDuration: DOUBLE
>>
>>  traceMessages: ARRAY[STRING]]
>>
>>
>> LatLong:
>>
>>  latitude: DOUBLE
>>
>>  longitude: DOUBLE
>>
>> Now it’s not always possible to annotate the class like this (you may not
>> own the class definition), so you can also explicitly register this using
>> Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans.
>>
>> Coders
>>
>> Beam has a built-in coder for any schema-aware PCollection, largely
>> removing the need for users to care about coders. We generate low-level
>> bytecode (using ByteBuddy) to implement the coder for each schema, so these
>> coders are quite performant. This provides a better default coder for Java
>> POJO objects as well. In the past users were recommended to use AvroCoder
>> for pojos, which many have found inefficient. Now there’s a more-efficient
>> solution.
>>
>> Utility Transforms
>>
>> Schemas are already useful for implementers of extensions such as SQL,
>> but the goal was to use them to make Beam itself easier to use. To this
>> end, I’ve been implementing a library of transforms that allow for easy
>> manipulation of schema PCollections. So far Filter and Select are merged,
>> Group is about to go out for review (it needs some more javadoc and unit
>> tests), and Join is being developed but doesn’t yet have a final interface.
>>
>> Filter
>>
>> Given a PCollection<LatLong>, I want to keep only those in an area of
>> southern manhattan. Well this is easy!
>>
>> PCollection<LatLong> manhattanEvents = allEvents.apply(Filter
>>
>>  .whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
>>
>>  .whereFieldName("longitude", long -> long < -73.969 && long > -74.747));
>>
>> Schemas along with lambdas allows us to write this transform
>> declaratively. The Filter transform also allows you to register filter
>> functions that operate on multiple fields at the same time.
>>
>> Select
>>
>> Let’s say that I don’t need all the fields in a row. For instance, I’m
>> only interested in the userId and traceMessages, and don’t care about the
>> location. In that case I can write the following:
>>
>> PCollection<Row> selected = allEvents.apply(Select.fieldNames(“userId”, “
>> traceMessages”));
>>
>>
>> BTW, Beam also keeps track of which fields are accessed by a transform In
>> the future we can automatically insert Selects in front of subgraphs to
>> drop fields that are not referenced in that subgraph.
>>
>> Group
>>
>> Group is one of the more advanced transforms. In its most basic form, it
>> provides a convenient way to group by key:
>>
>> PCollection<KV<Row, Iterable<UserEvent>> byUserAndCountry =
>>
>>    allEvents.apply(Group.byFieldNames(“userId”, “countryCode”));
>>
>> Notice how much more concise this is than using GroupByKey directly!
>>
>> The Group transform really starts to shine however when you start
>> specifying aggregations. You can aggregate any field (or fields) and build
>> up an output schema based on these aggregations. For example:
>>
>> PCollection<KV<Row, Row>> aggregated = allEvents.apply(
>>
>>    Group.byFieldNames(“userId”, “countryCode”)
>>
>>        .aggregateField("cost", Sum.ofLongs(), "total_cost")
>>
>>        .aggregateField("cost", Top.<Long>largestFn(10), “top_purchases”)
>>
>>        .aggregateField("transationDuration",
>> ApproximateQuantilesCombineFn.create(21),
>>
>>              “durationHistogram”)));
>>
>> This will individually aggregate the specified fields of the input items
>> (by user and country), and generate an output schema for these
>> aggregations. In this case, the output schema will be the following:
>>
>> AggregatedSchema:
>>
>>    total_cost: INT64
>>
>>    top_purchases: ARRAY[INT64]
>>
>>    durationHistogram: ARRAY[DOUBLE]
>>
>> There are some more utility transforms I've written that are worth
>> looking at such as Convert (which can convert between user types that share
>> a schema) and Unnest (flattens nested schemas). There are also some others
>> such as Pivot that we should consider writing
>>
>> There is still a lot to do. All the todo items are reflected in JIRA,
>> however here are some examples of current gaps:
>>
>>
>>    -
>>
>>    Support for read-only POJOs (those with final fields) and JavaBean
>>    (objects without setters).
>>    -
>>
>>    Automatic schema inference from more Java types: protocol buffers,
>>    avro, AutoValue, etc.
>>    -
>>
>>    Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
>>    -
>>
>>    Support for JsonPath expressions so users can better express nested
>>    fields. E.g. support expressions of the form 
>> Select.fields(“field1.field2”,
>>    “field3.*”, “field4[0].field5”);
>>    -
>>
>>    Schemas still need to be defined in our portability layer so they can
>>    be used cross language.
>>
>>
>> If anyone is interested in helping close these gaps, you'll be helping
>> make Beam a better, more-usable system!
>>
>> Reuven
>>
>>

Reply via email to