Some answer inline:
On Thu, Aug 30, 2018 at 7:56 AM Ismaël Mejía <[email protected]> wrote:

> Thanks Reuven for the excellent summary and thanks to all the guys who
> worked in the Schema/SQL improvements. This is great for usability. I
> really like the idea of making user experience simpler, e.g. by
> automatically inferring Coders. Some questions:
>
> - Any plans to add similar improvements for the python/go SDKs ?
>

Yes, all languages should support this. I'm focusing on Java first.


>
> - I suppose that deconstructing the element objects based on
> annotations happens in flight, have you thought about a way to
> eventually push this into the previous transform (e.g. the IO) via
> some sort of push-down predicate ? (I suppose this applies for
> jsonpath filters but should be more complex)
>

I assume you mean  turning a user element into something accessible by
fields? For Pojos/JavaBeans we never "deconstruct" the elements. ByteBuddy
is used to generate low-level field accessors for these objects. So
internally the system can turn a Pojo into a Row object very cheaply, at
the cost of a single object allocation. This also means that the memory
representation for this object remains the user's Pojo.

There is opportunity to push some things down though Since we can analyze
the graph and determine which fields are being accessed (often), we could
insert projections early in the graph that drops unused fields.


> - Do you think it makes sense to have ways for IOs to provide
> transforms to convert from/to Rows. I remember there was some work on
> this for the SQL. I am wondering how we can make the Schema/SQL
> experience even more friendly.
>

Yes, absolutely. Transforms aren't needed - the IOs need only register
SerializableFunctions to do this conversion with the SchemaRegistry (if the
schema is registered, there is a generic Convert transform that can always
convert between any two types with the same Schema, including Rows). This
will make these IOs much friendlier to use.

>
> - What is the current intermediate representation, if I remember well
> weren’t Coders in part a way of hacking around the issues of
> determinism in Java Serialization. So if we use Java serialization
> (generated via bytebuddy) wouldn’t we have similar issues?
>

In addition to determinism issues, Java Serialization is an incredibly
inefficient encoding mechanism (in some cases the serialized size of a
record can be 100x larger). Java serialization isn't being used. Right now
ByteBuddy is generating a nested coder that uses the existing Beam coders
(e.g. StringUtf8Coder, VarLongCoder, etc.).

>
> - Have you envisioned other ways to serialize apart of the generation
> via bytebuddy ? e.g. to make Row compatible with formats supported in
> multiple languages e.g. protobuf, or some arrow-like thing that we can
> just submit without reserialization and can be decoded back (this will
> be great for portability).
>

Yes, I looked at Arrow and Flatbuffers, but I haven't had time to actually
play with them yet. One problem with Arrow is that Beam has a
record-at-a-time model, and Arrow is better suited for encoding batches of
records.

It's worth noting that in our portability model, all of these Beam coders
will have a URN and an external spec as to how they encode - so the current
coder should also be compatible across languages. A big advantage of
formats such as proto or flatbuffers, is that they are already designed to
support schema evolution (e.g. updating your schema with new fields) in a
compatible way.

Another thing to thing about: an encoding method that allows us to
efficiently implement Select without parsing the whole record would be
nice. There is a body of research in the DB world about good encoding
methods for records.

One idea: in the future the FnApi can natively represent schemas, which
means that we won't need a coder at all. The portability layer can then
choose its own encoding mechanism, and since FnApi can operate on batches,
Arrow might be used to encode these batches efficiently.


> - Any discussions on row ↔ json conversion? Sounds like a trivial /
> common case too
>

Should be an easy transform to add. It would also be nice to support
JsonSchema as a way of reading schemas.


>
> Regards,
> Ismael
> On Thu, Aug 30, 2018 at 4:51 PM Reuven Lax <[email protected]> wrote:
> >
> > Andrew - the @Experimental tag simply means that we are free to change
> the interfaces without waiting for the next major Beam version. Once we are
> happy to freeze these interfaces, we can drop the tag.
> >
> > On Wed, Aug 29, 2018 at 1:48 PM Andrew Pilloud <[email protected]>
> wrote:
> >>
> >> The work you've done to generalize and expand Schemas has significantly
> simplified what we need to do for SQL, I hope they are valuable to everyone
> else too. What work remains before we can drop the Experimental designation?
> >>
> >> Andrew
> >>
> >> On Wed, Aug 29, 2018 at 5:31 AM Eugene Kirpichov <[email protected]>
> wrote:
> >>>
> >>> Wow, this is really coming together, congratulations and thanks for
> the great work!
> >>>
> >>> On Wed, Aug 29, 2018 at 1:40 AM Reuven Lax <[email protected]> wrote:
> >>>>
> >>>> I wanted to send a quick note to the community about the current
> status of schema-aware PCollections in Beam. As some might remember we had
> a good discussion last year about the design of these schemas, involving
> many folks from different parts of the community. I sent a summary earlier
> this year explaining how schemas has been integrated into the DoFn
> framework. Much has happened since then, and here are some of the
> highlights.
> >>>>
> >>>>
> >>>> First, I want to emphasize that all the schema-aware classes are
> currently marked @Experimental. Nothing is set in stone yet, so if you have
> questions about any decisions made, please start a discussion!
> >>>>
> >>>>
> >>>> SQL
> >>>>
> >>>> The first big milestone for schemas was porting all of BeamSQL to use
> the framework, which was done in pr/5956. This was a lot of work, exposed
> many bugs in the schema implementation, but now provides great evidence
> that schemas work!
> >>>>
> >>>>
> >>>> Schema inference
> >>>>
> >>>> Beam can automatically infer schemas from Java POJOs (objects with
> public fields) or JavaBean objects (objects with getter/setter methods).
> Often you can do this by simply annotating the class. For example:
> >>>>
> >>>>
> >>>> @DefaultSchema(JavaFieldSchema.class)
> >>>>
> >>>> public class UserEvent {
> >>>>
> >>>>  public String userId;
> >>>>
> >>>>  public LatLong location;
> >>>>
> >>>>  Public String countryCode;
> >>>>
> >>>>  public long transactionCost;
> >>>>
> >>>>  public double transactionDuration;
> >>>>
> >>>>  public List<String> traceMessages;
> >>>>
> >>>> };
> >>>>
> >>>>
> >>>> @DefaultSchema(JavaFieldSchema.class)
> >>>>
> >>>> public class LatLong {
> >>>>
> >>>>  public double latitude;
> >>>>
> >>>>  public double longitude;
> >>>>
> >>>> }
> >>>>
> >>>>
> >>>> Beam will automatically infer schemas for these classes! So if you
> have a PCollection<UserEvent>, it will automatically get the following
> schema:
> >>>>
> >>>>
> >>>> UserEvent:
> >>>>
> >>>>  userId: STRING
> >>>>
> >>>>  location: ROW(LatLong)
> >>>>
> >>>>  countryCode: STRING
> >>>>
> >>>>  transactionCost: INT64
> >>>>
> >>>>  transactionDuration: DOUBLE
> >>>>
> >>>>  traceMessages: ARRAY[STRING]]
> >>>>
> >>>>
> >>>> LatLong:
> >>>>
> >>>>  latitude: DOUBLE
> >>>>
> >>>>  longitude: DOUBLE
> >>>>
> >>>>
> >>>> Now it’s not always possible to annotate the class like this (you may
> not own the class definition), so you can also explicitly register this
> using Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans.
> >>>>
> >>>>
> >>>> Coders
> >>>>
> >>>> Beam has a built-in coder for any schema-aware PCollection, largely
> removing the need for users to care about coders. We generate low-level
> bytecode (using ByteBuddy) to implement the coder for each schema, so these
> coders are quite performant. This provides a better default coder for Java
> POJO objects as well. In the past users were recommended to use AvroCoder
> for pojos, which many have found inefficient. Now there’s a more-efficient
> solution.
> >>>>
> >>>>
> >>>> Utility Transforms
> >>>>
> >>>> Schemas are already useful for implementers of extensions such as
> SQL, but the goal was to use them to make Beam itself easier to use. To
> this end, I’ve been implementing a library of transforms that allow for
> easy manipulation of schema PCollections. So far Filter and Select are
> merged, Group is about to go out for review (it needs some more javadoc and
> unit tests), and Join is being developed but doesn’t yet have a final
> interface.
> >>>>
> >>>>
> >>>> Filter
> >>>>
> >>>> Given a PCollection<LatLong>, I want to keep only those in an area of
> southern manhattan. Well this is easy!
> >>>>
> >>>>
> >>>> PCollection<LatLong> manhattanEvents = allEvents.apply(Filter
> >>>>
> >>>>  .whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
> >>>>
> >>>>  .whereFieldName("longitude", long -> long < -73.969 && long >
> -74.747));
> >>>>
> >>>>
> >>>> Schemas along with lambdas allows us to write this transform
> declaratively. The Filter transform also allows you to register filter
> functions that operate on multiple fields at the same time.
> >>>>
> >>>>
> >>>> Select
> >>>>
> >>>> Let’s say that I don’t need all the fields in a row. For instance,
> I’m only interested in the userId and traceMessages, and don’t care about
> the location. In that case I can write the following:
> >>>>
> >>>>
> >>>> PCollection<Row> selected =
> allEvents.apply(Select.fieldNames(“userId”, “traceMessages”));
> >>>>
> >>>>
> >>>> BTW, Beam also keeps track of which fields are accessed by a
> transform In the future we can automatically insert Selects in front of
> subgraphs to drop fields that are not referenced in that subgraph.
> >>>>
> >>>>
> >>>> Group
> >>>>
> >>>> Group is one of the more advanced transforms. In its most basic form,
> it provides a convenient way to group by key:
> >>>>
> >>>>
> >>>> PCollection<KV<Row, Iterable<UserEvent>> byUserAndCountry =
> >>>>
> >>>>    allEvents.apply(Group.byFieldNames(“userId”, “countryCode”));
> >>>>
> >>>>
> >>>> Notice how much more concise this is than using GroupByKey directly!
> >>>>
> >>>>
> >>>> The Group transform really starts to shine however when you start
> specifying aggregations. You can aggregate any field (or fields) and build
> up an output schema based on these aggregations. For example:
> >>>>
> >>>>
> >>>> PCollection<KV<Row, Row>> aggregated = allEvents.apply(
> >>>>
> >>>>    Group.byFieldNames(“userId”, “countryCode”)
> >>>>
> >>>>        .aggregateField("cost", Sum.ofLongs(), "total_cost")
> >>>>
> >>>>        .aggregateField("cost", Top.<Long>largestFn(10),
> “top_purchases”)
> >>>>
> >>>>        .aggregateField("transationDuration",
> ApproximateQuantilesCombineFn.create(21),
> >>>>
> >>>>              “durationHistogram”)));
> >>>>
> >>>>
> >>>> This will individually aggregate the specified fields of the input
> items (by user and country), and generate an output schema for these
> aggregations. In this case, the output schema will be the following:
> >>>>
> >>>>
> >>>> AggregatedSchema:
> >>>>
> >>>>    total_cost: INT64
> >>>>
> >>>>    top_purchases: ARRAY[INT64]
> >>>>
> >>>>    durationHistogram: ARRAY[DOUBLE]
> >>>>
> >>>>
> >>>> There are some more utility transforms I've written that are worth
> looking at such as Convert (which can convert between user types that share
> a schema) and Unnest (flattens nested schemas). There are also some others
> such as Pivot that we should consider writing
> >>>>
> >>>>
> >>>> There is still a lot to do. All the todo items are reflected in JIRA,
> however here are some examples of current gaps:
> >>>>
> >>>>
> >>>> Support for read-only POJOs (those with final fields) and JavaBean
> (objects without setters).
> >>>>
> >>>> Automatic schema inference from more Java types: protocol buffers,
> avro, AutoValue, etc.
> >>>>
> >>>> Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
> >>>>
> >>>> Support for JsonPath expressions so users can better express nested
> fields. E.g. support expressions of the form Select.fields(“field1.field2”,
> “field3.*”, “field4[0].field5”);
> >>>>
> >>>> Schemas still need to be defined in our portability layer so they can
> be used cross language.
> >>>>
> >>>>
> >>>> If anyone is interested in helping close these gaps, you'll be
> helping make Beam a better, more-usable system!
> >>>>
> >>>> Reuven
> >>>>
>

Reply via email to