Hi, IMHO, it would be better to have a explicit transform/IO as converter.
It would be easier for users. Another option would be to use a "TypeConverter/SchemaConverter" map as we do in Camel: Beam could check the source/destination "type" and check in the map if there's a converter available. This map can be store as part of the pipeline (as we do for filesystem registration). My $0.01 Regards JB On 23/05/2018 07:51, Romain Manni-Bucau wrote: > How does it work on the pipeline side? > Do you generate these "virtual" IO at build time to enable the fluent > API to work not erasing generics? > > ex: SQL(row)->BigQuery(native) will not compile so we need a > SQL(row)->BigQuery(row) > > Side note unrelated to Row: if you add another registry maybe a pretask > is to ensure beam has a kind of singleton/context to avoid to duplicate > it or not track it properly. These kind of converters will need a global > close and not only per record in general: > converter.init();converter.convert(row);....converter.destroy();, > otherwise it easily leaks. This is why it can require some way to not > recreate it. A quick fix, if you are in bytebuddy already, can be to add > it to setup/teardown pby, being more global would be nicer but is more > challenging. > > Romain Manni-Bucau > @rmannibucau <https://twitter.com/rmannibucau> | Blog > <https://rmannibucau.metawerx.net/> | Old Blog > <http://rmannibucau.wordpress.com> | Github > <https://github.com/rmannibucau> | LinkedIn > <https://www.linkedin.com/in/rmannibucau> | Book > <https://www.packtpub.com/application-development/java-ee-8-high-performance> > > > Le mer. 23 mai 2018 à 07:22, Reuven Lax <re...@google.com > <mailto:re...@google.com>> a écrit : > > No - the only modules we need to add to core are the ones we choose > to add. For example, I will probably add a registration for > TableRow/TableSchema (GCP BigQuery) so these can work seamlessly > with schemas. However I will add that to the GCP module, so only > someone depending on that module need to pull in that dependency. > The Java ServiceLoader framework can be used by these modules to > register schemas for their types (we already do something similar > for FileSystem and for coders as well). > > BTW, right now the conversion back and forth between Row objects I'm > doing in the ByteBuddy generated bytecode that we generate in order > to invoke DoFns. > > Reuven > > On Tue, May 22, 2018 at 10:04 PM Romain Manni-Bucau > <rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>> wrote: > > Hmm, the pluggability part is close to what I wanted to do with > JsonObject as a main API (to avoid to redo a "row" API and > schema API) > Row.as(Class<T>) sounds good but then, does it mean we'll get > beam-sdk-java-row-jsonobject like modules (I'm not against, just > trying to understand here)? > If so, how an IO can use as() with the type it expects? Doesnt > it lead to have a tons of these modules at the end? > > Romain Manni-Bucau > @rmannibucau <https://twitter.com/rmannibucau> | Blog > <https://rmannibucau.metawerx.net/> | Old Blog > <http://rmannibucau.wordpress.com> | Github > <https://github.com/rmannibucau> | LinkedIn > <https://www.linkedin.com/in/rmannibucau> | Book > > <https://www.packtpub.com/application-development/java-ee-8-high-performance> > > > Le mer. 23 mai 2018 à 04:57, Reuven Lax <re...@google.com > <mailto:re...@google.com>> a écrit : > > By the way Romain, if you have specific scenarios in mind I > would love to hear them. I can try and guess what exactly > you would like to get out of schemas, but it would work > better if you gave me concrete scenarios that you would like > to work. > > Reuven > > On Tue, May 22, 2018 at 7:45 PM Reuven Lax <re...@google.com > <mailto:re...@google.com>> wrote: > > Yeah, what I'm working on will help with IO. Basically > if you register a function with SchemaRegistry that > converts back and forth between a type (say JsonObject) > and a Beam Row, then it is applied by the framework > behind the scenes as part of DoFn invocation. Concrete > example: let's say I have an IO that reads json objects > class MyJsonIORead extends PTransform<PBegin, > JsonObject> {...} > > If you register a schema for this type (or you can also > just set the schema directly on the output PCollection), > then Beam knows how to convert back and forth between > JsonObject and Row. So the next ParDo can look like > > p.apply(new MyJsonIORead()) > .apply(ParDo.of(new DoFn<JsonObject, T>.... > @ProcessElement void process(@Element Row row) { > }) > > And Beam will automatically convert JsonObject to a Row > for processing (you aren't forced to do this of course - > you can always ask for it as a JsonObject). > > The same is true for output. If you have a sink that > takes in JsonObject but the transform before it produces > Row objects (for instance - because the transform before > it is Beam SQL), Beam can automatically convert Row back > to JsonObject for you. > > All of this was detailed in the Schema doc I shared a > few months ago. There was a lot of discussion on that > document from various parties, and some of this API is a > result of that discussion. This is also working in the > branch JB and I were working on, though not yet > integrated back to master. > > I would like to actually go further and make Row an > interface and provide a way to automatically put a Row > interface on top of any other object (e.g. JsonObject, > Pojo, etc.) This won't change the way the user writes > code, but instead of Beam having to copy and convert at > each stage (e.g. from JsonObject to Row) it simply will > create a Row object that uses the the JsonObject as its > underlying storage. > > Reuven > > On Tue, May 22, 2018 at 11:37 AM Romain Manni-Bucau > <rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>> > wrote: > > Well, beam can implement a new mapper but it doesnt > help for io. Most of modern backends will take json > directly, even javax one and it must stay generic. > > Then since json to pojo mapping is already done a > dozen of times, not sure it is worth it for now. > > Le mar. 22 mai 2018 20:27, Reuven Lax > <re...@google.com <mailto:re...@google.com>> a écrit : > > We can do even better btw. Building a > SchemaRegistry where automatic conversions can > be registered between schema and Java data > types. With this the user won't even need a DoFn > to do the conversion. > > On Tue, May 22, 2018, 10:13 AM Romain > Manni-Bucau <rmannibu...@gmail.com > <mailto:rmannibu...@gmail.com>> wrote: > > Hi guys, > > Checked out what has been done on schema > model and think it is acceptable - regarding > the json debate - > if https://issues.apache.org/jira/browse/BEAM-4381 > can be fixed. > > High level, it is about providing a > mainstream and not too impacting model OOTB > and JSON seems the most valid option for > now, at least for IO and some user transforms. > > Wdyt? > > Le ven. 27 avr. 2018 18:36, Romain > Manni-Bucau <rmannibu...@gmail.com > <mailto:rmannibu...@gmail.com>> a écrit : > > Can give it a try end of may, sure. > (holidays and work constraints will make > it hard before). > > Le 27 avr. 2018 18:26, "Anton Kedin" > <ke...@google.com > <mailto:ke...@google.com>> a écrit : > > Romain, > > I don't believe that JSON approach > was investigated very thoroughIy. I > mentioned few reasons which will > make it not the best choice my > opinion, but I may be wrong. Can you > put together a design doc or a > prototype? > > Thank you, > Anton > > > On Thu, Apr 26, 2018 at 10:17 PM > Romain Manni-Bucau > <rmannibu...@gmail.com > <mailto:rmannibu...@gmail.com>> wrote: > > > > Le 26 avr. 2018 23:13, "Anton > Kedin" <ke...@google.com > <mailto:ke...@google.com>> a écrit : > > BeamRecord (Row) has very > little in common with > JsonObject (I assume you're > talking about javax.json), > except maybe some > similarities of the API. Few > reasons why JsonObject > doesn't work: > > * it is a Java EE API: > o Beam SDK is not > limited to Java. > There are probably > similar APIs for > other languages but > they might not > necessarily carry > the same semantics / > APIs; > > > Not a big deal I think. At least > not a technical blocker. > > o It can change > between Java versions; > > No, this is javaee ;). > > > o Current Beam java > implementation is an > experimental feature > to identify what's > needed from such > API, in the end we > might end up with > something similar to > JsonObject API, but > likely not > > > I dont get that point as a blocker > > o ; > * represents JSON, which > is not an API but an > object notation: > o it is defined as > unicode string in a > certain format. If > you choose to adhere > to ECMA-404, then it > doesn't sound like > JsonObject can > represent an Avro > object, if I'm > reading it right; > > > It is in the generator impl, you > can impl an avrogenerator. > > * doesn't define a type > system (JSON does, but > it's lacking): > o for example, JSON > doesn't define > semantics for numbers; > o doesn't define > date/time types; > o doesn't allow > extending JSON type > system at all; > > > That is why you need a metada > object, or simpler, a schema > with that data. Json or beam > record doesnt help here and you > end up on the same outcome if > you think about it. > > * lacks schemas; > > Jsonschema are standard, widely > spread and tooled compared to > alternative. > > You can definitely try > loosen the requirements and > define everything in JSON in > userland, but the point of > Row/Schema is to avoid it > and define everything in > Beam model, which can be > extended, mapped to JSON, > Avro, BigQuery Schemas, > custom binary format etc., > with same semantics across > beam SDKs. > > > This is what jsonp would allow > with the benefit of a natural > pojo support through jsonb. > > > > On Thu, Apr 26, 2018 at > 12:28 PM Romain Manni-Bucau > <rmannibu...@gmail.com > <mailto:rmannibu...@gmail.com>> > wrote: > > Just to let it be clear > and let me understand: > how is BeamRecord > different from a > JsonObject which is an > API without > implementation (not > event a json one OOTB)? > Advantage of json *api* > are indeed natural > mapping (jsonb is based > on jsonp so no new > binding to reinvent) and > simple serialization > (json+gzip for ex, or > avro if you want to be > geeky). > > I fail to see the point > to rebuild an ecosystem ATM. > > Le 26 avr. 2018 19:12, > "Reuven Lax" > <re...@google.com > <mailto:re...@google.com>> > a écrit : > > Exactly what JB > said. We will write > a generic conversion > from Avro (or json) > to Beam schemas, > which will make them > work transparently > with SQL. The plan > is also to migrate > Anton's work so that > POJOs works > generically for any > schema. > > Reuven > > On Thu, Apr 26, 2018 > at 1:17 AM > Jean-Baptiste Onofré > <j...@nanthrax.net > > <mailto:j...@nanthrax.net>> > wrote: > > For now we have > a generic schema > interface. > Json-b can be an > impl, avro could > be another one. > > Regards > JB > Le 26 avr. 2018, > à 12:08, Romain > Manni-Bucau > <rmannibu...@gmail.com > > <mailto:rmannibu...@gmail.com>> > a écrit: > > Hmm, > > avro has > still the > pitfalls to > have an > uncontrolled > stack which > brings way > too much > dependencies > to be part > of any API, > this is why > I proposed a > JSON-P based > API > (JsonObject) > with a > custom beam > entry for > some > metadata > (headers "à > la Camel"). > > > Romain > Manni-Bucau > @rmannibucau > > <https://twitter.com/rmannibucau> > | Blog > > <https://rmannibucau.metawerx.net/> | > Old Blog > > <http://rmannibucau.wordpress.com> > | Github > > <https://github.com/rmannibucau> | > LinkedIn > > <https://www.linkedin.com/in/rmannibucau> | > Book > > <https://www.packtpub.com/application-development/java-ee-8-high-performance> > > > 2018-04-26 > 9:59 > GMT+02:00 > Jean-Baptiste > Onofré > <j...@nanthrax.net > > <mailto:j...@nanthrax.net>>: > > > Hi Ismael > > You mean > directly > in Beam > SQL ? > > That > will be > part of > schema > support: > generic > record > could be > one of > the > payload > with > across > schema. > > Regards > JB > Le 26 > avr. > 2018, à > 11:39, > "Ismaël > Mejía" < > > ieme...@gmail.com > > <mailto:ieme...@gmail.com>> > a écrit: > > Hello > Anton, > > Thanks > for the descriptive email and the really useful work. Any plans > to tackle > PCollections of GenericRecord/IndexedRecords? it seems Avro > is a > natural fit for this approach too. > > Regards, > Ismaël > > On Wed, > Apr 25, 2018 at 9:04 PM, Anton Kedin <ke...@google.com > > <mailto:ke...@google.com>> wrote: > > > > Hi, > > I want > to > > highlight > a > couple > of > > improvements > to > Beam > SQL > we > have > been > > > working > on > > recently > which > are > > targeted > to > make > Beam > SQL > API > easier > to > use. > > > Specifically > these > > features > > simplify > > conversion > of > Java > Beans > and > JSON > > > strings > to > Rows. > > > Feel > free > to > try > this > and > send > any > > bugs/comments/PRs > my > way. > > > > **Caveat: > this > is > still > work > in > > progress, > and > has > known > bugs > and > > incomplete > > > features, > see > below > for > > details.** > > > > Background > > > Beam > SQL > > queries > can > only > be > > applied > to > > PCollection<Row>. > This > means > that > > users > need > to > > convert > > whatever > > PCollection > > elements > they > have > to > Rows > before > > > querying > them > with > SQL. > This > > usually > > requires > > manually > > creating > a > Schema > and > > implementing > a > custom > > conversion > > PTransform<PCollection< > > Element>, > > > PCollection<Row>> > (see > Beam > SQL > > Guide). > > > The > > improvements > > described > here > are > an > > attempt > to > reduce > this > > overhead > for > few > common > cases, > as > a > start. > > > Status > > > > Introduced > a > InferredRowCoder > to > > automatically > > generate > rows > from > beans. > > > Removes > the > need > to > > manually > define > a > Schema > and > Row > > conversion > logic; > > > Introduced > > JsonToRow > > transform > to > > automatically > parse > JSON > > objects > to > Rows. > > > Removes > the > need > to > > manually > > implement > a > conversion > logic; > > This > is > still > > experimental > work > in > > progress, > APIs > will > likely > > change; > > There > are > known > > bugs/unsolved > > problems; > > > > Java > Beans > > > > Introduced > a > coder > which > > facilitates > Rows > > generation > from > Java > Beans. > > > Reduces > the > > overhead > to: > > > /** > > Some > > user-defined > > Java > > Bean > */ > > > class > > JavaBeanObject > > implements > > Serializable > { > > String > > getName() > { > ... > } > } > > > > // > > Obtain > > the > > objects: > > > PCollection<JavaBeanObject> > > javaBeans > = > ...; > > > > > // > > Convert > to > > Rows > > and > > apply > a > SQL > > query: > > > PCollection<Row> > > queryResult > = > > javaBeans > > > .setCoder(InferredRowCoder. > > ofSerializable(JavaBeanObject. > > class)) > > > .apply(BeamSql.query("SELECT > > name > > FROM > > PCOLLECTION")); > > > > > > Notice, > there > is > no > more > manual > Schema > > definition > or > custom > > conversion > > logic. > > > Links > > > > example; > > > InferredRowCoder; > > test; > > > > JSON > > > > Introduced > > JsonToRow > > transform. > It > is > > possible > to > query > a > > PCollection<String> > that > > contains > JSON > > objects > like > this: > > > // > > Assuming > > JSON > > objects > > look > > like > > this: > > // > { > "type" > : > "foo", > > "size" > : > 333 > } > > // > > Define > a > Schema: > > > Schema > > jsonSchema > = > > Schema > > > .builder() > > > .addStringField("type") > > > .addInt32Field("size") > > > .build(); > > > // > > Obtain > > PCollection > of > > the > > objects > in > > JSON > > format: > > > PCollection<String> > > jsonObjects > = > ... > > > // > > Convert > to > > Rows > > and > > apply > a > SQL > > query: > > > PCollection<Row> > > queryResults > = > > jsonObjects > > > .apply(JsonToRow.withSchema( > > jsonSchema)) > > > .apply(BeamSql.query("SELECT > > type, > > AVG(size) > > FROM > > PCOLLECTION > > GROUP > BY > > > type")); > > > > > > Notice, > JSON > to > Row > > conversion > is > done > by > > JsonToRow > > transform. > It > is > > currently > > required > to > supply > a > Schema. > > > Links > > > > JsonToRow; > > > test/example; > > > > Going > > Forward > > > fix > bugs > > (BEAM-4163, > > BEAM-4161 > ...) > > > implement > more > > features > > (BEAM-4167, > more > types > of > > objects); > > wire > this > up > with > > sources/sinks > to > > further > > simplify > SQL > API; > > > > Thank > you, > > Anton > > > >