Nice feature, thanks Reuven !

I started to revamp the Spark runner with dataset, I will leverage this !

Regards
JB

On 29/08/2018 07:40, Reuven Lax wrote:
> I wanted to send a quick note to the community about the current status
> of schema-aware PCollections in Beam. As some might remember we had a
> good discussion last year about the design of these schemas, involving
> many folks from different parts of the community. I sent a summary
> earlier this year explaining how schemas has been integrated into the
> DoFn framework. Much has happened since then, and here are some of the
> highlights.
> 
> 
> First, I want to emphasize that all the schema-aware classes are
> currently marked @Experimental. Nothing is set in stone yet, so if you
> have questions about any decisions made, please start a discussion!
> 
> 
>       SQL
> 
> The first big milestone for schemas was porting all of BeamSQL to use
> the framework, which was done in pr/5956. This was a lot of work,
> exposed many bugs in the schema implementation, but now provides great
> evidence that schemas work!
> 
> 
>       Schema inference
> 
> Beam can automatically infer schemas from Java POJOs (objects with
> public fields) or JavaBean objects (objects with getter/setter methods).
> Often you can do this by simply annotating the class. For example:
> 
> 
> @DefaultSchema(JavaFieldSchema.class)
> 
> publicclassUserEvent{
> 
>  publicStringuserId;
> 
>  publicLatLonglocation;
> 
>  PublicStringcountryCode;
> 
>  publiclongtransactionCost;
> 
>  publicdoubletransactionDuration;
> 
>  publicList<String>traceMessages;
> 
> };
> 
> 
> @DefaultSchema(JavaFieldSchema.class)
> 
> publicclassLatLong{
> 
>  publicdoublelatitude;
> 
>  publicdoublelongitude;
> 
> }
> 
> 
> Beam will automatically infer schemas for these classes! So if you have
> a PCollection<UserEvent>, it will automatically get the following schema:
> 
> 
> UserEvent:
> 
>  userId: STRING
> 
>  location: ROW(LatLong)
> 
>  countryCode: STRING
> 
>  transactionCost: INT64
> 
>  transactionDuration: DOUBLE
> 
>  traceMessages: ARRAY[STRING]]
> 
> 
> LatLong:
> 
>  latitude: DOUBLE
> 
>  longitude: DOUBLE
> 
> 
> Now it’s not always possible to annotate the class like this (you may
> not own the class definition), so you can also explicitly register this
> using Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans.
> 
> 
>       Coders
> 
> Beam has a built-in coder for any schema-aware PCollection, largely
> removing the need for users to care about coders. We generate low-level
> bytecode (using ByteBuddy) to implement the coder for each schema, so
> these coders are quite performant. This provides a better default coder
> for Java POJO objects as well. In the past users were recommended to use
> AvroCoder for pojos, which many have found inefficient. Now there’s a
> more-efficient solution.
> 
> 
>       Utility Transforms
> 
> Schemas are already useful for implementers of extensions such as SQL,
> but the goal was to use them to make Beam itself easier to use. To this
> end, I’ve been implementing a library of transforms that allow for easy
> manipulation of schema PCollections. So far Filter and Select are
> merged, Group is about to go out for review (it needs some more javadoc
> and unit tests), and Join is being developed but doesn’t yet have a
> final interface.
> 
> 
> Filter
> 
> Given a PCollection<LatLong>, I want to keep only those in an area of
> southern manhattan. Well this is easy!
> 
> 
> PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
> 
>  .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
> 
>  .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
> 
> 
> Schemas along with lambdas allows us to write this transform
> declaratively. The Filter transform also allows you to register filter
> functions that operate on multiple fields at the same time.
> 
> 
> Select
> 
> Let’s say that I don’t need all the fields in a row. For instance, I’m
> only interested in the userId and traceMessages, and don’t care about
> the location. In that case I can write the following:
> 
> 
> PCollection<Row>selected
> =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
> 
> 
> BTW, Beam also keeps track of which fields are accessed by a transform
> In the future we can automatically insert Selects in front of subgraphs
> to drop fields that are not referenced in that subgraph.
> 
> 
> Group
> 
> Group is one of the more advanced transforms. In its most basic form, it
> provides a convenient way to group by key:
> 
> 
> PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =  
> 
>    allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
> 
> 
> Notice how much more concise this is than using GroupByKey directly!
> 
> 
> The Group transform really starts to shine however when you start
> specifying aggregations. You can aggregate any field (or fields) and
> build up an output schema based on these aggregations. For example:
> 
> 
> PCollection<KV<Row,Row>>aggregated =allEvents.apply(
> 
>    Group.byFieldNames(“userId”,“countryCode”)
> 
>        .aggregateField("cost",Sum.ofLongs(),"total_cost")
> 
>        .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
> 
>        
> .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
> 
>              “durationHistogram”)));
> 
> 
> This will individually aggregate the specified fields of the input items
> (by user and country), and generate an output schema for these
> aggregations. In this case, the output schema will be the following:
> 
> 
> AggregatedSchema:
> 
>    total_cost: INT64
> 
>    top_purchases: ARRAY[INT64]
> 
>    durationHistogram: ARRAY[DOUBLE]
> 
> 
> There are some more utility transforms I've written that are worth
> looking at such as Convert (which can convert between user types that
> share a schema) and Unnest (flattens nested schemas). There are also
> some others such as Pivot that we should consider writing
> 
> 
> There is still a lot to do. All the todo items are reflected in JIRA,
> however here are some examples of current gaps:
> 
> 
>   *
> 
>     Support for read-only POJOs (those with final fields) and JavaBean
>     (objects without setters).
> 
>   *
> 
>     Automatic schema inference from more Java types: protocol buffers,
>     avro, AutoValue, etc.
> 
>   *
> 
>     Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
> 
>   *
> 
>     Support for JsonPath expressions so users can better express nested
>     fields. E.g. support expressions of the form
>     Select.fields(“field1.field2”, “field3.*”, “field4[0].field5”);
> 
>   *
> 
>     Schemas still need to be defined in our portability layer so they
>     can be used cross language.
> 
> 
> If anyone is interested in helping close these gaps, you'll be helping
> make Beam a better, more-usable system!
> 
> Reuven
> 

-- 
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to