> I wanted to send a quick note to the community about the current status of 
> schema-aware PCollections in Beam. As some
> might remember we had a good discussion last year about the design of these 
> schemas, involving many folks from
> different parts of the community. I sent a summary earlier this year 
> explaining how schemas has been integrated into
> the DoFn framework. Much has happened since then, and here are some of the 
> highlights.
> First, I want to emphasize that all the schema-aware classes are currently 
> marked @Experimental. Nothing is set in
> stone yet, so if you have questions about any decisions made, please start a 
> discussion!
> SQLThe first big milestone for schemas was porting all of BeamSQL to use the 
> framework, which was done in pr/5956.
> This was a lot of work, exposed many bugs in the schema implementation, but 
> now provides great evidence that schemas
> work!
> Schema inferenceBeam can automatically infer schemas from Java POJOs (objects 
> with public fields) or JavaBean objects
> (objects with getter/setter methods). Often you can do this by simply 
> annotating the class. For example:
> @DefaultSchema(JavaFieldSchema.class)public class UserEvent {  public String 
> userId;  public LatLong location;  Public
> String countryCode;  public long transactionCost;  public double 
> transactionDuration;  public List<String>
> traceMessages;};
> @DefaultSchema(JavaFieldSchema.class)public class LatLong {  public double 
> latitude;  public double longitude;}
> Beam will automatically infer schemas for these classes! So if you have a 
> PCollection<UserEvent>, it will
> automatically get the following schema:
> UserEvent:  userId: STRING  location: ROW(LatLong)  countryCode: STRING  
> transactionCost: INT64  transactionDuration:
> DOUBLE  traceMessages: ARRAY[STRING]]
> LatLong:  latitude: DOUBLE  longitude: DOUBLE
> Now it’s not always possible to annotate the class like this (you may not own 
> the class definition), so you can also
> explicitly register this using Pipeline:getSchemaRegistry:registerPOJO, and 
> the same for JavaBeans.
> CodersBeam has a built-in coder for any schema-aware PCollection, largely 
> removing the need for users to care about
> coders. We generate low-level bytecode (using ByteBuddy) to implement the 
> coder for each schema, so these coders are
> quite performant. This provides a better default coder for Java POJO objects 
> as well. In the past users were
> recommended to use AvroCoder for pojos, which many have found inefficient. 
> Now there’s a more-efficient solution.
> Utility TransformsSchemas are already useful for implementers of extensions 
> such as SQL, but the goal was to use them
> to make Beam itself easier to use. To this end, I’ve been implementing a 
> library of transforms that allow for easy
> manipulation of schema PCollections. So far Filter and Select are merged, 
> Group is about to go out for review (it
> needs some more javadoc and unit tests), and Join is being developed but 
> doesn’t yet have a final interface.
> FilterGiven a PCollection<LatLong>, I want to keep only those in an area of 
> southern manhattan. Well this is easy!
> PCollection<LatLong> manhattanEvents = allEvents.apply(Filter  
> .whereFieldName("latitude", lat -> lat < 40.720 && lat
> > 40.699)  .whereFieldName("longitude", long -> long < -73.969 && long > 
> > -74.747));
> Schemas along with lambdas allows us to write this transform declaratively. 
> The Filter transform also allows you to
> register filter functions that operate on multiple fields at the same time.
> SelectLet’s say that I don’t need all the fields in a row. For instance, I’m 
> only interested in the userId and
> traceMessages, and don’t care about the location. In that case I can write 
> the following:
> PCollection<Row> selected = allEvents.apply(Select.fieldNames(“userId”, 
> “traceMessages”));
> BTW, Beam also keeps track of which fields are accessed by a transform In the 
> future we can automatically insert
> Selects in front of subgraphs to drop fields that are not referenced in that 
> subgraph.
> GroupGroup is one of the more advanced transforms. In its most basic form, it 
> provides a convenient way to group by
> key:
> PCollection<KV<Row, Iterable<UserEvent>> byUserAndCountry =       
> allEvents.apply(Group.byFieldNames(“userId”,
> “countryCode”));
> Notice how much more concise this is than using GroupByKey directly!
> The Group transform really starts to shine however when you start specifying 
> aggregations. You can aggregate any field
> (or fields) and build up an output schema based on these aggregations. For 
> example:
> PCollection<KV<Row, Row>> aggregated = allEvents.apply(    
> Group.byFieldNames(“userId”, “countryCode”)
>        .aggregateField("cost", Sum.ofLongs(), "total_cost")        
> .aggregateField("cost", Top.<Long>largestFn(10),
> “top_purchases”)        .aggregateField("transationDuration",
> ApproximateQuantilesCombineFn.create(21),               
> “durationHistogram”)));
> This will individually aggregate the specified fields of the input items (by 
> user and country), and generate an output
> schema for these aggregations. In this case, the output schema will be the 
> following:
> AggregatedSchema:    total_cost: INT64    top_purchases: ARRAY[INT64]    
> durationHistogram: ARRAY[DOUBLE]
> There are some more utility transforms I've written that are worth looking at 
> such as Convert (which can convert
> between user types that share a schema) and Unnest (flattens nested schemas). 
> There are also some others such as Pivot
> that we should consider writing
> There is still a lot to do. All the todo items are reflected in JIRA, however 
> here are some examples of current gaps:
> Support for read-only POJOs (those with final fields) and JavaBean (objects 
> without setters).Automatic schema
> inference from more Java types: protocol buffers, avro, AutoValue, 
> etc.Integration with sources (BigQueryIO, JdbcIO,
> AvroIO, etc.)Support for JsonPath expressions so users can better express 
> nested fields. E.g. support expressions of
> the form Select.fields(“field1.field2”, “field3.*”, 
> “field4[0].field5”);Schemas still need to be defined in our
> portability layer so they can be used cross language.
> If anyone is interested in helping close these gaps, you'll be helping make 
> Beam a better, more-usable system!
> Reuven

