On Fri, Aug 31, 2018 at 11:22 AM Maximilian Michels <m...@apache.org
<mailto:m...@apache.org>> wrote:
Thanks Reuven. That's an OK restriction. Apache Flink also requires
non-final fields to be able to generate TypeInformation (~=Schema) from
PoJos.
I agree that it's not very intuitive for Users.
I suppose it would work to assume a constructor with the same parameter
order as the fields in the class. So if instantiation with the default
constructor doesn't work, it would try to look up a constructor
based on
the fields of the class.
I think this would make a lot of sense, but it would require some
assumptions (e.g. the declared field order is the same as the
constructor argument order (and/or the schema order), especially if
there are fields of the same type). Probably still worth doing, either
under a more limited set of constraints (all fields are of a different
type), or as opt-in.
Perhaps too much magic, having a dedicated interface for
construction is
a more programmatic approach.
-Max
On 30.08.18 16:55, Reuven Lax wrote:
> Max,
>
> Nested Pojos are fully supported, as are nested array/collection
and map
> types (e.g. if your Pojo contains List<OtherPojo>).
>
> One limitation right now is that only mutable Pojos are
supported. For
> example, the following Pojo would _not_ work, because the fields
aren't
> mutable.
>
> public class Pojo {
> public final String field;
> }
>
> This is an annoying restriction, because in practice Pojo types
often
> have final fields. The reason for the restriction is that the most
> general way to create an instance of this Pojo (after decoding)
is to
> instantiate the object and then set the fields one by one (I also
assume
> that there's a default constructor). I can remove this
restriction if
> there is an appropriate constructor or builder interface that
lets us
> construct the object directly.
>
> Reuven
>
> On Thu, Aug 30, 2018 at 6:51 AM Maximilian Michels
<m...@apache.org <mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>
> That's a cool feature. Are there any limitations for the schema
> inference apart from being a Pojo/Bean? Does it supported
nested PoJos,
> e.g. "wrapper.field"?
>
> -Max
>
> On 29.08.18 07:40, Reuven Lax wrote:
> > I wanted to send a quick note to the community about the
current
> status
> > of schema-aware PCollections in Beam. As some might
remember we
> had a
> > good discussion last year about the design of these schemas,
> involving
> > many folks from different parts of the community. I sent a
summary
> > earlier this year explaining how schemas has been
integrated into
> the
> > DoFn framework. Much has happened since then, and here are
some
> of the
> > highlights.
> >
> >
> > First, I want to emphasize that all the schema-aware
classes are
> > currently marked @Experimental. Nothing is set in stone
yet, so
> if you
> > have questions about any decisions made, please start a
discussion!
> >
> >
> > SQL
> >
> > The first big milestone for schemas was porting all of
BeamSQL to
> use
> > the framework, which was done in pr/5956. This was a lot
of work,
> > exposed many bugs in the schema implementation, but now
provides
> great
> > evidence that schemas work!
> >
> >
> > Schema inference
> >
> > Beam can automatically infer schemas from Java POJOs
(objects with
> > public fields) or JavaBean objects (objects with getter/setter
> methods).
> > Often you can do this by simply annotating the class. For
example:
> >
> >
> > @DefaultSchema(JavaFieldSchema.class)
> >
> > publicclassUserEvent{
> >
> > publicStringuserId;
> >
> > publicLatLonglocation;
> >
> > PublicStringcountryCode;
> >
> > publiclongtransactionCost;
> >
> > publicdoubletransactionDuration;
> >
> > publicList<String>traceMessages;
> >
> > };
> >
> >
> > @DefaultSchema(JavaFieldSchema.class)
> >
> > publicclassLatLong{
> >
> > publicdoublelatitude;
> >
> > publicdoublelongitude;
> >
> > }
> >
> >
> > Beam will automatically infer schemas for these classes! So if
> you have
> > a PCollection<UserEvent>, it will automatically get the
following
> schema:
> >
> >
> > UserEvent:
> >
> > userId: STRING
> >
> > location: ROW(LatLong)
> >
> > countryCode: STRING
> >
> > transactionCost: INT64
> >
> > transactionDuration: DOUBLE
> >
> > traceMessages: ARRAY[STRING]]
> >
> >
> > LatLong:
> >
> > latitude: DOUBLE
> >
> > longitude: DOUBLE
> >
> >
> > Now it’s not always possible to annotate the class like
this (you
> may
> > not own the class definition), so you can also explicitly
> register this
> > using Pipeline:getSchemaRegistry:registerPOJO, and the
same for
> JavaBeans.
> >
> >
> > Coders
> >
> > Beam has a built-in coder for any schema-aware
PCollection, largely
> > removing the need for users to care about coders. We generate
> low-level
> > bytecode (using ByteBuddy) to implement the coder for each
> schema, so
> > these coders are quite performant. This provides a better
default
> coder
> > for Java POJO objects as well. In the past users were
recommended
> to use
> > AvroCoder for pojos, which many have found inefficient. Now
> there’s a
> > more-efficient solution.
> >
> >
> > Utility Transforms
> >
> > Schemas are already useful for implementers of extensions
such as
> SQL,
> > but the goal was to use them to make Beam itself easier to
use.
> To this
> > end, I’ve been implementing a library of transforms that allow
> for easy
> > manipulation of schema PCollections. So far Filter and
Select are
> > merged, Group is about to go out for review (it needs some
more
> javadoc
> > and unit tests), and Join is being developed but doesn’t
yet have a
> > final interface.
> >
> >
> > Filter
> >
> > Given a PCollection<LatLong>, I want to keep only those in an
> area of
> > southern manhattan. Well this is easy!
> >
> >
> > PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
> >
> > .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
> >
> >
.whereFieldName("longitude",long->long<-73.969&&long>-74.747));
> >
> >
> > Schemas along with lambdas allows us to write this transform
> > declaratively. The Filter transform also allows you to
register
> filter
> > functions that operate on multiple fields at the same time.
> >
> >
> > Select
> >
> > Let’s say that I don’t need all the fields in a row. For
> instance, I’m
> > only interested in the userId and traceMessages, and don’t
care
> about
> > the location. In that case I can write the following:
> >
> >
> > PCollection<Row>selected
> > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
> >
> >
> > BTW, Beam also keeps track of which fields are accessed by a
> transform
> > In the future we can automatically insert Selects in front of
> subgraphs
> > to drop fields that are not referenced in that subgraph.
> >
> >
> > Group
> >
> > Group is one of the more advanced transforms. In its most
basic
> form, it
> > provides a convenient way to group by key:
> >
> >
> > PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =
> >
> >
allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
> >
> >
> > Notice how much more concise this is than using GroupByKey
directly!
> >
> >
> > The Group transform really starts to shine however when
you start
> > specifying aggregations. You can aggregate any field (or
fields) and
> > build up an output schema based on these aggregations. For
example:
> >
> >
> > PCollection<KV<Row,Row>>aggregated =allEvents.apply(
> >
> > Group.byFieldNames(“userId”,“countryCode”)
> >
> > .aggregateField("cost",Sum.ofLongs(),"total_cost")
> >
> >
.aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
> >
> >
>
.aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
> >
> > “durationHistogram”)));
> >
> >
> > This will individually aggregate the specified fields of the
> input items
> > (by user and country), and generate an output schema for these
> > aggregations. In this case, the output schema will be the
following:
> >
> >
> > AggregatedSchema:
> >
> > total_cost: INT64
> >
> > top_purchases: ARRAY[INT64]
> >
> > durationHistogram: ARRAY[DOUBLE]
> >
> >
> > There are some more utility transforms I've written that
are worth
> > looking at such as Convert (which can convert between user
types
> that
> > share a schema) and Unnest (flattens nested schemas).
There are also
> > some others such as Pivot that we should consider writing
> >
> >
> > There is still a lot to do. All the todo items are
reflected in
> JIRA,
> > however here are some examples of current gaps:
> >
> >
> > *
> >
> > Support for read-only POJOs (those with final fields) and
> JavaBean
> > (objects without setters).
> >
> > *
> >
> > Automatic schema inference from more Java types: protocol
> buffers,
> > avro, AutoValue, etc.
> >
> > *
> >
> > Integration with sources (BigQueryIO, JdbcIO, AvroIO,
etc.)
> >
> > *
> >
> > Support for JsonPath expressions so users can better
express
> nested
> > fields. E.g. support expressions of the form
> > Select.fields(“field1.field2”, “field3.*”,
“field4[0].field5”);
> >
> > *
> >
> > Schemas still need to be defined in our portability
layer so they
> > can be used cross language.
> >
> >
> > If anyone is interested in helping close these gaps, you'll be
> helping
> > make Beam a better, more-usable system!
> >
> > Reuven
> >
>