On Fri, Aug 31, 2018 at 2:22 AM Maximilian Michels <[email protected]> wrote:

> Thanks Reuven. That's an OK restriction. Apache Flink also requires
> non-final fields to be able to generate TypeInformation (~=Schema) from
> PoJos.
>
> I agree that it's not very intuitive for Users.
>
> I suppose it would work to assume a constructor with the same parameter
> order as the fields in the class. So if instantiation with the default
> constructor doesn't work, it would try to look up a constructor based on
> the fields of the class.
>

Actually Java reflection doesn't guarantee any guaranteed order of fields
or methods when you query them. We would have to look a constructor with
the exact same parameter names as the fields. Unfortunately users sometimes
shorten the parameter names when creating such constructors, which would
defeat this. We could also provide a set of dedicated annotations to allow
the user to mark the constructor (or static builder method) used to create
the class.


> Perhaps too much magic, having a dedicated interface for construction is
> a more programmatic approach.
>
> -Max
>
> On 30.08.18 16:55, Reuven Lax wrote:
> > Max,
> >
> > Nested Pojos are fully supported, as are nested array/collection and map
> > types (e.g. if your Pojo contains List<OtherPojo>).
> >
> > One limitation right now is that only mutable Pojos are supported. For
> > example, the following Pojo would _not_ work, because the fields aren't
> > mutable.
> >
> > public class Pojo {
> >    public final String field;
> > }
> >
> > This is an annoying restriction, because in practice Pojo types often
> > have final fields. The reason for the restriction is that the most
> > general way to create an instance of this Pojo (after decoding) is to
> > instantiate the object and then set the fields one by one (I also assume
> > that there's a default constructor).  I can remove this restriction if
> > there is an appropriate constructor or builder interface that lets us
> > construct the object directly.
> >
> > Reuven
> >
> > On Thu, Aug 30, 2018 at 6:51 AM Maximilian Michels <[email protected]
> > <mailto:[email protected]>> wrote:
> >
> >     That's a cool feature. Are there any limitations for the schema
> >     inference apart from being a Pojo/Bean? Does it supported nested
> PoJos,
> >     e.g. "wrapper.field"?
> >
> >     -Max
> >
> >     On 29.08.18 07:40, Reuven Lax wrote:
> >      > I wanted to send a quick note to the community about the current
> >     status
> >      > of schema-aware PCollections in Beam. As some might remember we
> >     had a
> >      > good discussion last year about the design of these schemas,
> >     involving
> >      > many folks from different parts of the community. I sent a summary
> >      > earlier this year explaining how schemas has been integrated into
> >     the
> >      > DoFn framework. Much has happened since then, and here are some
> >     of the
> >      > highlights.
> >      >
> >      >
> >      > First, I want to emphasize that all the schema-aware classes are
> >      > currently marked @Experimental. Nothing is set in stone yet, so
> >     if you
> >      > have questions about any decisions made, please start a
> discussion!
> >      >
> >      >
> >      >       SQL
> >      >
> >      > The first big milestone for schemas was porting all of BeamSQL to
> >     use
> >      > the framework, which was done in pr/5956. This was a lot of work,
> >      > exposed many bugs in the schema implementation, but now provides
> >     great
> >      > evidence that schemas work!
> >      >
> >      >
> >      >       Schema inference
> >      >
> >      > Beam can automatically infer schemas from Java POJOs (objects with
> >      > public fields) or JavaBean objects (objects with getter/setter
> >     methods).
> >      > Often you can do this by simply annotating the class. For example:
> >      >
> >      >
> >      > @DefaultSchema(JavaFieldSchema.class)
> >      >
> >      > publicclassUserEvent{
> >      >
> >      > publicStringuserId;
> >      >
> >      > publicLatLonglocation;
> >      >
> >      > PublicStringcountryCode;
> >      >
> >      > publiclongtransactionCost;
> >      >
> >      > publicdoubletransactionDuration;
> >      >
> >      > publicList<String>traceMessages;
> >      >
> >      > };
> >      >
> >      >
> >      > @DefaultSchema(JavaFieldSchema.class)
> >      >
> >      > publicclassLatLong{
> >      >
> >      > publicdoublelatitude;
> >      >
> >      > publicdoublelongitude;
> >      >
> >      > }
> >      >
> >      >
> >      > Beam will automatically infer schemas for these classes! So if
> >     you have
> >      > a PCollection<UserEvent>, it will automatically get the following
> >     schema:
> >      >
> >      >
> >      > UserEvent:
> >      >
> >      >   userId: STRING
> >      >
> >      >   location: ROW(LatLong)
> >      >
> >      >   countryCode: STRING
> >      >
> >      >   transactionCost: INT64
> >      >
> >      >   transactionDuration: DOUBLE
> >      >
> >      >   traceMessages: ARRAY[STRING]]
> >      >
> >      >
> >      > LatLong:
> >      >
> >      >   latitude: DOUBLE
> >      >
> >      >   longitude: DOUBLE
> >      >
> >      >
> >      > Now it’s not always possible to annotate the class like this (you
> >     may
> >      > not own the class definition), so you can also explicitly
> >     register this
> >      > using Pipeline:getSchemaRegistry:registerPOJO, and the same for
> >     JavaBeans.
> >      >
> >      >
> >      >       Coders
> >      >
> >      > Beam has a built-in coder for any schema-aware PCollection,
> largely
> >      > removing the need for users to care about coders. We generate
> >     low-level
> >      > bytecode (using ByteBuddy) to implement the coder for each
> >     schema, so
> >      > these coders are quite performant. This provides a better default
> >     coder
> >      > for Java POJO objects as well. In the past users were recommended
> >     to use
> >      > AvroCoder for pojos, which many have found inefficient. Now
> >     there’s a
> >      > more-efficient solution.
> >      >
> >      >
> >      >       Utility Transforms
> >      >
> >      > Schemas are already useful for implementers of extensions such as
> >     SQL,
> >      > but the goal was to use them to make Beam itself easier to use.
> >     To this
> >      > end, I’ve been implementing a library of transforms that allow
> >     for easy
> >      > manipulation of schema PCollections. So far Filter and Select are
> >      > merged, Group is about to go out for review (it needs some more
> >     javadoc
> >      > and unit tests), and Join is being developed but doesn’t yet have
> a
> >      > final interface.
> >      >
> >      >
> >      > Filter
> >      >
> >      > Given a PCollection<LatLong>, I want to keep only those in an
> >     area of
> >      > southern manhattan. Well this is easy!
> >      >
> >      >
> >      > PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
> >      >
> >      > .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
> >      >
> >      > .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
> >      >
> >      >
> >      > Schemas along with lambdas allows us to write this transform
> >      > declaratively. The Filter transform also allows you to register
> >     filter
> >      > functions that operate on multiple fields at the same time.
> >      >
> >      >
> >      > Select
> >      >
> >      > Let’s say that I don’t need all the fields in a row. For
> >     instance, I’m
> >      > only interested in the userId and traceMessages, and don’t care
> >     about
> >      > the location. In that case I can write the following:
> >      >
> >      >
> >      > PCollection<Row>selected
> >      > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
> >      >
> >      >
> >      > BTW, Beam also keeps track of which fields are accessed by a
> >     transform
> >      > In the future we can automatically insert Selects in front of
> >     subgraphs
> >      > to drop fields that are not referenced in that subgraph.
> >      >
> >      >
> >      > Group
> >      >
> >      > Group is one of the more advanced transforms. In its most basic
> >     form, it
> >      > provides a convenient way to group by key:
> >      >
> >      >
> >      > PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =
> >      >
> >      >     allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
> >      >
> >      >
> >      > Notice how much more concise this is than using GroupByKey
> directly!
> >      >
> >      >
> >      > The Group transform really starts to shine however when you start
> >      > specifying aggregations. You can aggregate any field (or fields)
> and
> >      > build up an output schema based on these aggregations. For
> example:
> >      >
> >      >
> >      > PCollection<KV<Row,Row>>aggregated =allEvents.apply(
> >      >
> >      > Group.byFieldNames(“userId”,“countryCode”)
> >      >
> >      > .aggregateField("cost",Sum.ofLongs(),"total_cost")
> >      >
> >      > .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
> >      >
> >      >
> >
>  
> .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
> >      >
> >      >               “durationHistogram”)));
> >      >
> >      >
> >      > This will individually aggregate the specified fields of the
> >     input items
> >      > (by user and country), and generate an output schema for these
> >      > aggregations. In this case, the output schema will be the
> following:
> >      >
> >      >
> >      > AggregatedSchema:
> >      >
> >      >     total_cost: INT64
> >      >
> >      >     top_purchases: ARRAY[INT64]
> >      >
> >      >     durationHistogram: ARRAY[DOUBLE]
> >      >
> >      >
> >      > There are some more utility transforms I've written that are worth
> >      > looking at such as Convert (which can convert between user types
> >     that
> >      > share a schema) and Unnest (flattens nested schemas). There are
> also
> >      > some others such as Pivot that we should consider writing
> >      >
> >      >
> >      > There is still a lot to do. All the todo items are reflected in
> >     JIRA,
> >      > however here are some examples of current gaps:
> >      >
> >      >
> >      >   *
> >      >
> >      >     Support for read-only POJOs (those with final fields) and
> >     JavaBean
> >      >     (objects without setters).
> >      >
> >      >   *
> >      >
> >      >     Automatic schema inference from more Java types: protocol
> >     buffers,
> >      >     avro, AutoValue, etc.
> >      >
> >      >   *
> >      >
> >      >     Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
> >      >
> >      >   *
> >      >
> >      >     Support for JsonPath expressions so users can better express
> >     nested
> >      >     fields. E.g. support expressions of the form
> >      >     Select.fields(“field1.field2”, “field3.*”,
> “field4[0].field5”);
> >      >
> >      >   *
> >      >
> >      >     Schemas still need to be defined in our portability layer so
> they
> >      >     can be used cross language.
> >      >
> >      >
> >      > If anyone is interested in helping close these gaps, you'll be
> >     helping
> >      > make Beam a better, more-usable system!
> >      >
> >      > Reuven
> >      >
> >
>

Reply via email to