It's probably worth publishing this update as a blog post. On Fri, Aug 31, 2018 at 9:58 PM Reuven Lax <[email protected]> wrote:
> In addition, JdbcIO is another source that could integrate with schemas. > > Another point of integration could be with shared schema registries (such > as Kafka Schema Registry.). Any source can integrate with an external > registry and use it to set the schema on the output. > > Reuven > > On Fri, Aug 31, 2018 at 12:44 PM Robert Bradshaw <[email protected]> > wrote: > >> On Fri, Aug 31, 2018 at 5:01 PM Alexey Romanenko < >> [email protected]> wrote: >> >>> Thanks Reuven for updating community with this, great work! >>> >>> One small question about IO integration. What kind of integration this >>> is supposed to be? >>> >> >> Two IOs that I would love to see benefit from schemas are BigQuery and >> Avro (and really any source that already has a schema, even CSVs). This of >> course would require querying the source and possibly some of the data at >> pipeline construction time (which has pros and cons). Both of these >> examples also require a schema when writing, which under this scheme could >> be implicit rather than (re)provided by the user. >> >> >>> Are there any IOs that already benefit from Schemas support? >>> >> >>> On 31 Aug 2018, at 16:46, Reuven Lax <[email protected]> wrote: >>> >>> >>> >>> On Fri, Aug 31, 2018 at 2:22 AM Maximilian Michels <[email protected]> >>> wrote: >>> >>>> Thanks Reuven. That's an OK restriction. Apache Flink also requires >>>> non-final fields to be able to generate TypeInformation (~=Schema) from >>>> PoJos. >>>> >>>> I agree that it's not very intuitive for Users. >>>> >>>> I suppose it would work to assume a constructor with the same parameter >>>> order as the fields in the class. So if instantiation with the default >>>> constructor doesn't work, it would try to look up a constructor based >>>> on >>>> the fields of the class. >>>> >>> >>> Actually Java reflection doesn't guarantee any guaranteed order of >>> fields or methods when you query them. We would have to look a constructor >>> with the exact same parameter names as the fields. Unfortunately users >>> sometimes shorten the parameter names when creating such constructors, >>> which would defeat this. We could also provide a set of dedicated >>> annotations to allow the user to mark the constructor (or static builder >>> method) used to create the class. >>> >>> >>>> Perhaps too much magic, having a dedicated interface for construction >>>> is >>>> a more programmatic approach. >>>> >>>> -Max >>>> >>>> On 30.08.18 16:55, Reuven Lax wrote: >>>> > Max, >>>> > >>>> > Nested Pojos are fully supported, as are nested array/collection and >>>> map >>>> > types (e.g. if your Pojo contains List<OtherPojo>). >>>> > >>>> > One limitation right now is that only mutable Pojos are supported. >>>> For >>>> > example, the following Pojo would _not_ work, because the fields >>>> aren't >>>> > mutable. >>>> > >>>> > public class Pojo { >>>> > public final String field; >>>> > } >>>> > >>>> > This is an annoying restriction, because in practice Pojo types often >>>> > have final fields. The reason for the restriction is that the most >>>> > general way to create an instance of this Pojo (after decoding) is to >>>> > instantiate the object and then set the fields one by one (I also >>>> assume >>>> > that there's a default constructor). I can remove this restriction >>>> if >>>> > there is an appropriate constructor or builder interface that lets us >>>> > construct the object directly. >>>> > >>>> > Reuven >>>> > >>>> > On Thu, Aug 30, 2018 at 6:51 AM Maximilian Michels <[email protected] >>>> > <mailto:[email protected]>> wrote: >>>> > >>>> > That's a cool feature. Are there any limitations for the schema >>>> > inference apart from being a Pojo/Bean? Does it supported nested >>>> PoJos, >>>> > e.g. "wrapper.field"? >>>> > >>>> > -Max >>>> > >>>> > On 29.08.18 07:40, Reuven Lax wrote: >>>> > > I wanted to send a quick note to the community about the >>>> current >>>> > status >>>> > > of schema-aware PCollections in Beam. As some might remember we >>>> > had a >>>> > > good discussion last year about the design of these schemas, >>>> > involving >>>> > > many folks from different parts of the community. I sent a >>>> summary >>>> > > earlier this year explaining how schemas has been integrated >>>> into >>>> > the >>>> > > DoFn framework. Much has happened since then, and here are some >>>> > of the >>>> > > highlights. >>>> > > >>>> > > >>>> > > First, I want to emphasize that all the schema-aware classes >>>> are >>>> > > currently marked @Experimental. Nothing is set in stone yet, so >>>> > if you >>>> > > have questions about any decisions made, please start a >>>> discussion! >>>> > > >>>> > > >>>> > > SQL >>>> > > >>>> > > The first big milestone for schemas was porting all of BeamSQL >>>> to >>>> > use >>>> > > the framework, which was done in pr/5956. This was a lot of >>>> work, >>>> > > exposed many bugs in the schema implementation, but now >>>> provides >>>> > great >>>> > > evidence that schemas work! >>>> > > >>>> > > >>>> > > Schema inference >>>> > > >>>> > > Beam can automatically infer schemas from Java POJOs (objects >>>> with >>>> > > public fields) or JavaBean objects (objects with getter/setter >>>> > methods). >>>> > > Often you can do this by simply annotating the class. For >>>> example: >>>> > > >>>> > > >>>> > > @DefaultSchema(JavaFieldSchema.class) >>>> > > >>>> > > publicclassUserEvent{ >>>> > > >>>> > > publicStringuserId; >>>> > > >>>> > > publicLatLonglocation; >>>> > > >>>> > > PublicStringcountryCode; >>>> > > >>>> > > publiclongtransactionCost; >>>> > > >>>> > > publicdoubletransactionDuration; >>>> > > >>>> > > publicList<String>traceMessages; >>>> > > >>>> > > }; >>>> > > >>>> > > >>>> > > @DefaultSchema(JavaFieldSchema.class) >>>> > > >>>> > > publicclassLatLong{ >>>> > > >>>> > > publicdoublelatitude; >>>> > > >>>> > > publicdoublelongitude; >>>> > > >>>> > > } >>>> > > >>>> > > >>>> > > Beam will automatically infer schemas for these classes! So if >>>> > you have >>>> > > a PCollection<UserEvent>, it will automatically get the >>>> following >>>> > schema: >>>> > > >>>> > > >>>> > > UserEvent: >>>> > > >>>> > > userId: STRING >>>> > > >>>> > > location: ROW(LatLong) >>>> > > >>>> > > countryCode: STRING >>>> > > >>>> > > transactionCost: INT64 >>>> > > >>>> > > transactionDuration: DOUBLE >>>> > > >>>> > > traceMessages: ARRAY[STRING]] >>>> > > >>>> > > >>>> > > LatLong: >>>> > > >>>> > > latitude: DOUBLE >>>> > > >>>> > > longitude: DOUBLE >>>> > > >>>> > > >>>> > > Now it’s not always possible to annotate the class like this >>>> (you >>>> > may >>>> > > not own the class definition), so you can also explicitly >>>> > register this >>>> > > using Pipeline:getSchemaRegistry:registerPOJO, and the same for >>>> > JavaBeans. >>>> > > >>>> > > >>>> > > Coders >>>> > > >>>> > > Beam has a built-in coder for any schema-aware PCollection, >>>> largely >>>> > > removing the need for users to care about coders. We generate >>>> > low-level >>>> > > bytecode (using ByteBuddy) to implement the coder for each >>>> > schema, so >>>> > > these coders are quite performant. This provides a better >>>> default >>>> > coder >>>> > > for Java POJO objects as well. In the past users were >>>> recommended >>>> > to use >>>> > > AvroCoder for pojos, which many have found inefficient. Now >>>> > there’s a >>>> > > more-efficient solution. >>>> > > >>>> > > >>>> > > Utility Transforms >>>> > > >>>> > > Schemas are already useful for implementers of extensions such >>>> as >>>> > SQL, >>>> > > but the goal was to use them to make Beam itself easier to use. >>>> > To this >>>> > > end, I’ve been implementing a library of transforms that allow >>>> > for easy >>>> > > manipulation of schema PCollections. So far Filter and Select >>>> are >>>> > > merged, Group is about to go out for review (it needs some more >>>> > javadoc >>>> > > and unit tests), and Join is being developed but doesn’t yet >>>> have a >>>> > > final interface. >>>> > > >>>> > > >>>> > > Filter >>>> > > >>>> > > Given a PCollection<LatLong>, I want to keep only those in an >>>> > area of >>>> > > southern manhattan. Well this is easy! >>>> > > >>>> > > >>>> > > PCollection<LatLong>manhattanEvents =allEvents.apply(Filter >>>> > > >>>> > > .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699) >>>> > > >>>> > > .whereFieldName("longitude",long->long<-73.969&&long>-74.747)); >>>> > > >>>> > > >>>> > > Schemas along with lambdas allows us to write this transform >>>> > > declaratively. The Filter transform also allows you to register >>>> > filter >>>> > > functions that operate on multiple fields at the same time. >>>> > > >>>> > > >>>> > > Select >>>> > > >>>> > > Let’s say that I don’t need all the fields in a row. For >>>> > instance, I’m >>>> > > only interested in the userId and traceMessages, and don’t care >>>> > about >>>> > > the location. In that case I can write the following: >>>> > > >>>> > > >>>> > > PCollection<Row>selected >>>> > > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”)); >>>> > > >>>> > > >>>> > > BTW, Beam also keeps track of which fields are accessed by a >>>> > transform >>>> > > In the future we can automatically insert Selects in front of >>>> > subgraphs >>>> > > to drop fields that are not referenced in that subgraph. >>>> > > >>>> > > >>>> > > Group >>>> > > >>>> > > Group is one of the more advanced transforms. In its most basic >>>> > form, it >>>> > > provides a convenient way to group by key: >>>> > > >>>> > > >>>> > > PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry = >>>> > > >>>> > > >>>> allEvents.apply(Group.byFieldNames(“userId”,“countryCode”)); >>>> > > >>>> > > >>>> > > Notice how much more concise this is than using GroupByKey >>>> directly! >>>> > > >>>> > > >>>> > > The Group transform really starts to shine however when you >>>> start >>>> > > specifying aggregations. You can aggregate any field (or >>>> fields) and >>>> > > build up an output schema based on these aggregations. For >>>> example: >>>> > > >>>> > > >>>> > > PCollection<KV<Row,Row>>aggregated =allEvents.apply( >>>> > > >>>> > > Group.byFieldNames(“userId”,“countryCode”) >>>> > > >>>> > > .aggregateField("cost",Sum.ofLongs(),"total_cost") >>>> > > >>>> > > .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”) >>>> > > >>>> > > >>>> > >>>> >>>> .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21), >>>> > > >>>> > > “durationHistogram”))); >>>> > > >>>> > > >>>> > > This will individually aggregate the specified fields of the >>>> > input items >>>> > > (by user and country), and generate an output schema for these >>>> > > aggregations. In this case, the output schema will be the >>>> following: >>>> > > >>>> > > >>>> > > AggregatedSchema: >>>> > > >>>> > > total_cost: INT64 >>>> > > >>>> > > top_purchases: ARRAY[INT64] >>>> > > >>>> > > durationHistogram: ARRAY[DOUBLE] >>>> > > >>>> > > >>>> > > There are some more utility transforms I've written that are >>>> worth >>>> > > looking at such as Convert (which can convert between user >>>> types >>>> > that >>>> > > share a schema) and Unnest (flattens nested schemas). There >>>> are also >>>> > > some others such as Pivot that we should consider writing >>>> > > >>>> > > >>>> > > There is still a lot to do. All the todo items are reflected in >>>> > JIRA, >>>> > > however here are some examples of current gaps: >>>> > > >>>> > > >>>> > > * >>>> > > >>>> > > Support for read-only POJOs (those with final fields) and >>>> > JavaBean >>>> > > (objects without setters). >>>> > > >>>> > > * >>>> > > >>>> > > Automatic schema inference from more Java types: protocol >>>> > buffers, >>>> > > avro, AutoValue, etc. >>>> > > >>>> > > * >>>> > > >>>> > > Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.) >>>> > > >>>> > > * >>>> > > >>>> > > Support for JsonPath expressions so users can better >>>> express >>>> > nested >>>> > > fields. E.g. support expressions of the form >>>> > > Select.fields(“field1.field2”, “field3.*”, >>>> “field4[0].field5”); >>>> > > >>>> > > * >>>> > > >>>> > > Schemas still need to be defined in our portability layer >>>> so they >>>> > > can be used cross language. >>>> > > >>>> > > >>>> > > If anyone is interested in helping close these gaps, you'll be >>>> > helping >>>> > > make Beam a better, more-usable system! >>>> > > >>>> > > Reuven >>>> > > >>>> > >>>> >>> >>>
