Hmm, the pluggability part is close to what I wanted to do with JsonObject as a main API (to avoid to redo a "row" API and schema API) Row.as(Class<T>) sounds good but then, does it mean we'll get beam-sdk-java-row-jsonobject like modules (I'm not against, just trying to understand here)? If so, how an IO can use as() with the type it expects? Doesnt it lead to have a tons of these modules at the end?
Romain Manni-Bucau @rmannibucau <https://twitter.com/rmannibucau> | Blog <https://rmannibucau.metawerx.net/> | Old Blog <http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> | LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book <https://www.packtpub.com/application-development/java-ee-8-high-performance> Le mer. 23 mai 2018 à 04:57, Reuven Lax <re...@google.com> a écrit : > By the way Romain, if you have specific scenarios in mind I would love to > hear them. I can try and guess what exactly you would like to get out of > schemas, but it would work better if you gave me concrete scenarios that > you would like to work. > > Reuven > > On Tue, May 22, 2018 at 7:45 PM Reuven Lax <re...@google.com> wrote: > >> Yeah, what I'm working on will help with IO. Basically if you register a >> function with SchemaRegistry that converts back and forth between a type >> (say JsonObject) and a Beam Row, then it is applied by the framework behind >> the scenes as part of DoFn invocation. Concrete example: let's say I have >> an IO that reads json objects >> class MyJsonIORead extends PTransform<PBegin, JsonObject> {...} >> >> If you register a schema for this type (or you can also just set the >> schema directly on the output PCollection), then Beam knows how to convert >> back and forth between JsonObject and Row. So the next ParDo can look like >> >> p.apply(new MyJsonIORead()) >> .apply(ParDo.of(new DoFn<JsonObject, T>.... >> @ProcessElement void process(@Element Row row) { >> }) >> >> And Beam will automatically convert JsonObject to a Row for processing >> (you aren't forced to do this of course - you can always ask for it as a >> JsonObject). >> >> The same is true for output. If you have a sink that takes in JsonObject >> but the transform before it produces Row objects (for instance - because >> the transform before it is Beam SQL), Beam can automatically convert Row >> back to JsonObject for you. >> >> All of this was detailed in the Schema doc I shared a few months ago. >> There was a lot of discussion on that document from various parties, and >> some of this API is a result of that discussion. This is also working in >> the branch JB and I were working on, though not yet integrated back to >> master. >> >> I would like to actually go further and make Row an interface and provide >> a way to automatically put a Row interface on top of any other object (e.g. >> JsonObject, Pojo, etc.) This won't change the way the user writes code, but >> instead of Beam having to copy and convert at each stage (e.g. from >> JsonObject to Row) it simply will create a Row object that uses the the >> JsonObject as its underlying storage. >> >> Reuven >> >> On Tue, May 22, 2018 at 11:37 AM Romain Manni-Bucau < >> rmannibu...@gmail.com> wrote: >> >>> Well, beam can implement a new mapper but it doesnt help for io. Most of >>> modern backends will take json directly, even javax one and it must stay >>> generic. >>> >>> Then since json to pojo mapping is already done a dozen of times, not >>> sure it is worth it for now. >>> >>> Le mar. 22 mai 2018 20:27, Reuven Lax <re...@google.com> a écrit : >>> >>>> We can do even better btw. Building a SchemaRegistry where automatic >>>> conversions can be registered between schema and Java data types. With this >>>> the user won't even need a DoFn to do the conversion. >>>> >>>> On Tue, May 22, 2018, 10:13 AM Romain Manni-Bucau < >>>> rmannibu...@gmail.com> wrote: >>>> >>>>> Hi guys, >>>>> >>>>> Checked out what has been done on schema model and think it is >>>>> acceptable - regarding the json debate - if >>>>> https://issues.apache.org/jira/browse/BEAM-4381 can be fixed. >>>>> >>>>> High level, it is about providing a mainstream and not too impacting >>>>> model OOTB and JSON seems the most valid option for now, at least for IO >>>>> and some user transforms. >>>>> >>>>> Wdyt? >>>>> >>>>> Le ven. 27 avr. 2018 18:36, Romain Manni-Bucau <rmannibu...@gmail.com> >>>>> a écrit : >>>>> >>>>>> Can give it a try end of may, sure. (holidays and work constraints >>>>>> will make it hard before). >>>>>> >>>>>> Le 27 avr. 2018 18:26, "Anton Kedin" <ke...@google.com> a écrit : >>>>>> >>>>>>> Romain, >>>>>>> >>>>>>> I don't believe that JSON approach was investigated very thoroughIy. >>>>>>> I mentioned few reasons which will make it not the best choice my >>>>>>> opinion, >>>>>>> but I may be wrong. Can you put together a design doc or a prototype? >>>>>>> >>>>>>> Thank you, >>>>>>> Anton >>>>>>> >>>>>>> >>>>>>> On Thu, Apr 26, 2018 at 10:17 PM Romain Manni-Bucau < >>>>>>> rmannibu...@gmail.com> wrote: >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Le 26 avr. 2018 23:13, "Anton Kedin" <ke...@google.com> a écrit : >>>>>>>> >>>>>>>> BeamRecord (Row) has very little in common with JsonObject (I >>>>>>>> assume you're talking about javax.json), except maybe some >>>>>>>> similarities of >>>>>>>> the API. Few reasons why JsonObject doesn't work: >>>>>>>> >>>>>>>> - it is a Java EE API: >>>>>>>> - Beam SDK is not limited to Java. There are probably >>>>>>>> similar APIs for other languages but they might not necessarily >>>>>>>> carry the >>>>>>>> same semantics / APIs; >>>>>>>> >>>>>>>> >>>>>>>> Not a big deal I think. At least not a technical blocker. >>>>>>>> >>>>>>>> >>>>>>>> - It can change between Java versions; >>>>>>>> >>>>>>>> No, this is javaee ;). >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> - Current Beam java implementation is an experimental feature >>>>>>>> to identify what's needed from such API, in the end we might end >>>>>>>> up with >>>>>>>> something similar to JsonObject API, but likely not >>>>>>>> >>>>>>>> >>>>>>>> I dont get that point as a blocker >>>>>>>> >>>>>>>> >>>>>>>> - ; >>>>>>>> - represents JSON, which is not an API but an object >>>>>>>> notation: >>>>>>>> - it is defined as unicode string in a certain format. If >>>>>>>> you choose to adhere to ECMA-404, then it doesn't sound like >>>>>>>> JsonObject can >>>>>>>> represent an Avro object, if I'm reading it right; >>>>>>>> >>>>>>>> >>>>>>>> It is in the generator impl, you can impl an avrogenerator. >>>>>>>> >>>>>>>> >>>>>>>> - doesn't define a type system (JSON does, but it's lacking): >>>>>>>> - for example, JSON doesn't define semantics for numbers; >>>>>>>> - doesn't define date/time types; >>>>>>>> - doesn't allow extending JSON type system at all; >>>>>>>> >>>>>>>> >>>>>>>> That is why you need a metada object, or simpler, a schema with >>>>>>>> that data. Json or beam record doesnt help here and you end up on the >>>>>>>> same >>>>>>>> outcome if you think about it. >>>>>>>> >>>>>>>> >>>>>>>> - lacks schemas; >>>>>>>> >>>>>>>> Jsonschema are standard, widely spread and tooled compared to >>>>>>>> alternative. >>>>>>>> >>>>>>>> You can definitely try loosen the requirements and define >>>>>>>> everything in JSON in userland, but the point of Row/Schema is to >>>>>>>> avoid it >>>>>>>> and define everything in Beam model, which can be extended, mapped to >>>>>>>> JSON, >>>>>>>> Avro, BigQuery Schemas, custom binary format etc., with same semantics >>>>>>>> across beam SDKs. >>>>>>>> >>>>>>>> >>>>>>>> This is what jsonp would allow with the benefit of a natural pojo >>>>>>>> support through jsonb. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Apr 26, 2018 at 12:28 PM Romain Manni-Bucau < >>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Just to let it be clear and let me understand: how is BeamRecord >>>>>>>>> different from a JsonObject which is an API without implementation >>>>>>>>> (not >>>>>>>>> event a json one OOTB)? Advantage of json *api* are indeed natural >>>>>>>>> mapping >>>>>>>>> (jsonb is based on jsonp so no new binding to reinvent) and simple >>>>>>>>> serialization (json+gzip for ex, or avro if you want to be geeky). >>>>>>>>> >>>>>>>>> I fail to see the point to rebuild an ecosystem ATM. >>>>>>>>> >>>>>>>>> Le 26 avr. 2018 19:12, "Reuven Lax" <re...@google.com> a écrit : >>>>>>>>> >>>>>>>>>> Exactly what JB said. We will write a generic conversion from >>>>>>>>>> Avro (or json) to Beam schemas, which will make them work >>>>>>>>>> transparently >>>>>>>>>> with SQL. The plan is also to migrate Anton's work so that POJOs >>>>>>>>>> works >>>>>>>>>> generically for any schema. >>>>>>>>>> >>>>>>>>>> Reuven >>>>>>>>>> >>>>>>>>>> On Thu, Apr 26, 2018 at 1:17 AM Jean-Baptiste Onofré < >>>>>>>>>> j...@nanthrax.net> wrote: >>>>>>>>>> >>>>>>>>>>> For now we have a generic schema interface. Json-b can be an >>>>>>>>>>> impl, avro could be another one. >>>>>>>>>>> >>>>>>>>>>> Regards >>>>>>>>>>> JB >>>>>>>>>>> Le 26 avr. 2018, à 12:08, Romain Manni-Bucau < >>>>>>>>>>> rmannibu...@gmail.com> a écrit: >>>>>>>>>>>> >>>>>>>>>>>> Hmm, >>>>>>>>>>>> >>>>>>>>>>>> avro has still the pitfalls to have an uncontrolled stack which >>>>>>>>>>>> brings way too much dependencies to be part of any API, >>>>>>>>>>>> this is why I proposed a JSON-P based API (JsonObject) with a >>>>>>>>>>>> custom beam entry for some metadata (headers "à la Camel"). >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Romain Manni-Bucau >>>>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>>>>>>>>>> <http://rmannibucau.wordpress.com> | Github >>>>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn >>>>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>>>>>>>>>> >>>>>>>>>>>> 2018-04-26 9:59 GMT+02:00 Jean-Baptiste Onofré <j...@nanthrax.net >>>>>>>>>>>> >: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Ismael >>>>>>>>>>>>> >>>>>>>>>>>>> You mean directly in Beam SQL ? >>>>>>>>>>>>> >>>>>>>>>>>>> That will be part of schema support: generic record could be >>>>>>>>>>>>> one of the payload with across schema. >>>>>>>>>>>>> >>>>>>>>>>>>> Regards >>>>>>>>>>>>> JB >>>>>>>>>>>>> Le 26 avr. 2018, à 11:39, "Ismaël Mejía" < ieme...@gmail.com> >>>>>>>>>>>>> a écrit: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hello Anton, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for the descriptive email and the really useful work. Any >>>>>>>>>>>>>> plans >>>>>>>>>>>>>> to tackle PCollections of GenericRecord/IndexedRecords? it seems >>>>>>>>>>>>>> Avro >>>>>>>>>>>>>> is a natural fit for this approach too. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>> Ismaël >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Apr 25, 2018 at 9:04 PM, Anton Kedin <ke...@google.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I want to highlight a couple of improvements to Beam SQL we >>>>>>>>>>>>>>> have been >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> working on recently which are targeted to make Beam SQL API >>>>>>>>>>>>>>> easier to use. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Specifically these features simplify conversion of Java Beans >>>>>>>>>>>>>>> and JSON >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> strings to Rows. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Feel free to try this and send any bugs/comments/PRs my way. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> **Caveat: this is still work in progress, and has known bugs >>>>>>>>>>>>>>> and incomplete >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> features, see below for details.** >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Background >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Beam SQL queries can only be applied to PCollection<Row>. This >>>>>>>>>>>>>>> means that >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> users need to convert whatever PCollection elements they have >>>>>>>>>>>>>>> to Rows before >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> querying them with SQL. This usually requires manually >>>>>>>>>>>>>>> creating a Schema and >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> implementing a custom conversion PTransform<PCollection< >>>>>>>>>>>>>>> Element>, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> PCollection<Row>> (see Beam SQL Guide). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The improvements described here are an attempt to reduce this >>>>>>>>>>>>>>> overhead for >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> few common cases, as a start. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Status >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Introduced a InferredRowCoder to automatically generate rows >>>>>>>>>>>>>>> from beans. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Removes the need to manually define a Schema and Row >>>>>>>>>>>>>>> conversion logic; >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Introduced JsonToRow transform to automatically parse JSON >>>>>>>>>>>>>>> objects to Rows. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Removes the need to manually implement a conversion logic; >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This is still experimental work in progress, APIs will likely >>>>>>>>>>>>>>> change; >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> There are known bugs/unsolved problems; >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Java Beans >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Introduced a coder which facilitates Rows generation from Java >>>>>>>>>>>>>>> Beans. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Reduces the overhead to: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> /** Some user-defined Java Bean */ >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> class JavaBeanObject implements Serializable { >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> String getName() { ... } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> // Obtain the objects: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> PCollection<JavaBeanObject> javaBeans = ...; >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> // Convert to Rows and apply a SQL query: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> PCollection<Row> queryResult = >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> javaBeans >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> .setCoder(InferredRowCoder. >>>>>>>>>>>>>>>> ofSerializable(JavaBeanObject. >>>>>>>>>>>>>>>> class)) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> .apply(BeamSql.query("SELECT name FROM PCOLLECTION")); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Notice, there is no more manual Schema definition or custom >>>>>>>>>>>>>>> conversion >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> logic. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Links >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> example; >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> InferredRowCoder; >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> test; >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> JSON >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Introduced JsonToRow transform. It is possible to query a >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> PCollection<String> that contains JSON objects like this: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> // Assuming JSON objects look like this: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> // { "type" : "foo", "size" : 333 } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> // Define a Schema: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Schema jsonSchema = >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Schema >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> .builder() >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> .addStringField("type") >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> .addInt32Field("size") >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> .build(); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> // Obtain PCollection of the objects in JSON format: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> PCollection<String> jsonObjects = ... >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> // Convert to Rows and apply a SQL query: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> PCollection<Row> queryResults = >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> jsonObjects >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> .apply(JsonToRow.withSchema( >>>>>>>>>>>>>>>> jsonSchema)) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> .apply(BeamSql.query("SELECT type, AVG(size) FROM PCOLLECTION >>>>>>>>>>>>>>>> GROUP BY >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> type")); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Notice, JSON to Row conversion is done by JsonToRow transform. >>>>>>>>>>>>>>> It is >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> currently required to supply a Schema. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Links >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> JsonToRow; >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> test/example; >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Going Forward >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> fix bugs (BEAM-4163, BEAM-4161 ...) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> implement more features (BEAM-4167, more types of objects); >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> wire this up with sources/sinks to further simplify SQL API; >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thank you, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Anton >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>