I wanted to send a quick note to the community about the current status of
schema-aware PCollections in Beam. As some might remember we had a good
discussion last year about the design of these schemas, involving many
folks from different parts of the community. I sent a summary earlier this
year explaining how schemas has been integrated into the DoFn framework.
Much has happened since then, and here are some of the highlights.

First, I want to emphasize that all the schema-aware classes are currently
marked @Experimental. Nothing is set in stone yet, so if you have questions
about any decisions made, please start a discussion!

SQL

The first big milestone for schemas was porting all of BeamSQL to use the
framework, which was done in pr/5956. This was a lot of work, exposed many
bugs in the schema implementation, but now provides great evidence that
schemas work!

Schema inference

Beam can automatically infer schemas from Java POJOs (objects with public
fields) or JavaBean objects (objects with getter/setter methods). Often you
can do this by simply annotating the class. For example:

@DefaultSchema(JavaFieldSchema.class)

public class UserEvent {

 public String userId;

 public LatLong location;

 Public String countryCode;

 public long transactionCost;

 public double transactionDuration;

 public List<String> traceMessages;

};

@DefaultSchema(JavaFieldSchema.class)

public class LatLong {

 public double latitude;

 public double longitude;

}

Beam will automatically infer schemas for these classes! So if you have a
PCollection<UserEvent>, it will automatically get the following schema:

UserEvent:

 userId: STRING

 location: ROW(LatLong)

 countryCode: STRING

 transactionCost: INT64

 transactionDuration: DOUBLE

 traceMessages: ARRAY[STRING]]


LatLong:

 latitude: DOUBLE

 longitude: DOUBLE

Now it’s not always possible to annotate the class like this (you may not
own the class definition), so you can also explicitly register this using
Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans.

Coders

Beam has a built-in coder for any schema-aware PCollection, largely
removing the need for users to care about coders. We generate low-level
bytecode (using ByteBuddy) to implement the coder for each schema, so these
coders are quite performant. This provides a better default coder for Java
POJO objects as well. In the past users were recommended to use AvroCoder
for pojos, which many have found inefficient. Now there’s a more-efficient
solution.

Utility Transforms

Schemas are already useful for implementers of extensions such as SQL, but
the goal was to use them to make Beam itself easier to use. To this end,
I’ve been implementing a library of transforms that allow for easy
manipulation of schema PCollections. So far Filter and Select are merged,
Group is about to go out for review (it needs some more javadoc and unit
tests), and Join is being developed but doesn’t yet have a final interface.

Filter

Given a PCollection<LatLong>, I want to keep only those in an area of
southern manhattan. Well this is easy!

PCollection<LatLong> manhattanEvents = allEvents.apply(Filter

 .whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)

 .whereFieldName("longitude", long -> long < -73.969 && long > -74.747));

Schemas along with lambdas allows us to write this transform declaratively.
The Filter transform also allows you to register filter functions that
operate on multiple fields at the same time.

Select

Let’s say that I don’t need all the fields in a row. For instance, I’m only
interested in the userId and traceMessages, and don’t care about the
location. In that case I can write the following:

PCollection<Row> selected = allEvents.apply(Select.fieldNames(“userId”, “
traceMessages”));


BTW, Beam also keeps track of which fields are accessed by a transform In
the future we can automatically insert Selects in front of subgraphs to
drop fields that are not referenced in that subgraph.

Group

Group is one of the more advanced transforms. In its most basic form, it
provides a convenient way to group by key:

PCollection<KV<Row, Iterable<UserEvent>> byUserAndCountry =

   allEvents.apply(Group.byFieldNames(“userId”, “countryCode”));

Notice how much more concise this is than using GroupByKey directly!

The Group transform really starts to shine however when you start
specifying aggregations. You can aggregate any field (or fields) and build
up an output schema based on these aggregations. For example:

PCollection<KV<Row, Row>> aggregated = allEvents.apply(

   Group.byFieldNames(“userId”, “countryCode”)

       .aggregateField("cost", Sum.ofLongs(), "total_cost")

       .aggregateField("cost", Top.<Long>largestFn(10), “top_purchases”)

       .aggregateField("transationDuration", ApproximateQuantilesCombineFn.
create(21),

             “durationHistogram”)));

This will individually aggregate the specified fields of the input items
(by user and country), and generate an output schema for these
aggregations. In this case, the output schema will be the following:

AggregatedSchema:

   total_cost: INT64

   top_purchases: ARRAY[INT64]

   durationHistogram: ARRAY[DOUBLE]

There are some more utility transforms I've written that are worth looking
at such as Convert (which can convert between user types that share a
schema) and Unnest (flattens nested schemas). There are also some others
such as Pivot that we should consider writing

There is still a lot to do. All the todo items are reflected in JIRA,
however here are some examples of current gaps:


   -

   Support for read-only POJOs (those with final fields) and JavaBean
   (objects without setters).
   -

   Automatic schema inference from more Java types: protocol buffers, avro,
   AutoValue, etc.
   -

   Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
   -

   Support for JsonPath expressions so users can better express nested
   fields. E.g. support expressions of the form Select.fields(“field1.field2”,
   “field3.*”, “field4[0].field5”);
   -

   Schemas still need to be defined in our portability layer so they can be
   used cross language.


If anyone is interested in helping close these gaps, you'll be helping make
Beam a better, more-usable system!

Reuven

Reply via email to