How does it work on the pipeline side? Do you generate these "virtual" IO at build time to enable the fluent API to work not erasing generics?
ex: SQL(row)->BigQuery(native) will not compile so we need a SQL(row)->BigQuery(row) Side note unrelated to Row: if you add another registry maybe a pretask is to ensure beam has a kind of singleton/context to avoid to duplicate it or not track it properly. These kind of converters will need a global close and not only per record in general: converter.init();converter.convert(row);....converter.destroy();, otherwise it easily leaks. This is why it can require some way to not recreate it. A quick fix, if you are in bytebuddy already, can be to add it to setup/teardown pby, being more global would be nicer but is more challenging. Romain Manni-Bucau @rmannibucau <https://twitter.com/rmannibucau> | Blog <https://rmannibucau.metawerx.net/> | Old Blog <http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> | LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book <https://www.packtpub.com/application-development/java-ee-8-high-performance> Le mer. 23 mai 2018 à 07:22, Reuven Lax <[email protected]> a écrit : > No - the only modules we need to add to core are the ones we choose to > add. For example, I will probably add a registration for > TableRow/TableSchema (GCP BigQuery) so these can work seamlessly with > schemas. However I will add that to the GCP module, so only someone > depending on that module need to pull in that dependency. The Java > ServiceLoader framework can be used by these modules to register schemas > for their types (we already do something similar for FileSystem and for > coders as well). > > BTW, right now the conversion back and forth between Row objects I'm doing > in the ByteBuddy generated bytecode that we generate in order to invoke > DoFns. > > Reuven > > On Tue, May 22, 2018 at 10:04 PM Romain Manni-Bucau <[email protected]> > wrote: > >> Hmm, the pluggability part is close to what I wanted to do with >> JsonObject as a main API (to avoid to redo a "row" API and schema API) >> Row.as(Class<T>) sounds good but then, does it mean we'll get >> beam-sdk-java-row-jsonobject like modules (I'm not against, just trying to >> understand here)? >> If so, how an IO can use as() with the type it expects? Doesnt it lead to >> have a tons of these modules at the end? >> >> Romain Manni-Bucau >> @rmannibucau <https://twitter.com/rmannibucau> | Blog >> <https://rmannibucau.metawerx.net/> | Old Blog >> <http://rmannibucau.wordpress.com> | Github >> <https://github.com/rmannibucau> | LinkedIn >> <https://www.linkedin.com/in/rmannibucau> | Book >> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >> >> >> Le mer. 23 mai 2018 à 04:57, Reuven Lax <[email protected]> a écrit : >> >>> By the way Romain, if you have specific scenarios in mind I would love >>> to hear them. I can try and guess what exactly you would like to get out of >>> schemas, but it would work better if you gave me concrete scenarios that >>> you would like to work. >>> >>> Reuven >>> >>> On Tue, May 22, 2018 at 7:45 PM Reuven Lax <[email protected]> wrote: >>> >>>> Yeah, what I'm working on will help with IO. Basically if you register >>>> a function with SchemaRegistry that converts back and forth between a type >>>> (say JsonObject) and a Beam Row, then it is applied by the framework behind >>>> the scenes as part of DoFn invocation. Concrete example: let's say I have >>>> an IO that reads json objects >>>> class MyJsonIORead extends PTransform<PBegin, JsonObject> {...} >>>> >>>> If you register a schema for this type (or you can also just set the >>>> schema directly on the output PCollection), then Beam knows how to convert >>>> back and forth between JsonObject and Row. So the next ParDo can look like >>>> >>>> p.apply(new MyJsonIORead()) >>>> .apply(ParDo.of(new DoFn<JsonObject, T>.... >>>> @ProcessElement void process(@Element Row row) { >>>> }) >>>> >>>> And Beam will automatically convert JsonObject to a Row for processing >>>> (you aren't forced to do this of course - you can always ask for it as a >>>> JsonObject). >>>> >>>> The same is true for output. If you have a sink that takes in >>>> JsonObject but the transform before it produces Row objects (for instance - >>>> because the transform before it is Beam SQL), Beam can automatically >>>> convert Row back to JsonObject for you. >>>> >>>> All of this was detailed in the Schema doc I shared a few months ago. >>>> There was a lot of discussion on that document from various parties, and >>>> some of this API is a result of that discussion. This is also working in >>>> the branch JB and I were working on, though not yet integrated back to >>>> master. >>>> >>>> I would like to actually go further and make Row an interface and >>>> provide a way to automatically put a Row interface on top of any other >>>> object (e.g. JsonObject, Pojo, etc.) This won't change the way the user >>>> writes code, but instead of Beam having to copy and convert at each stage >>>> (e.g. from JsonObject to Row) it simply will create a Row object that uses >>>> the the JsonObject as its underlying storage. >>>> >>>> Reuven >>>> >>>> On Tue, May 22, 2018 at 11:37 AM Romain Manni-Bucau < >>>> [email protected]> wrote: >>>> >>>>> Well, beam can implement a new mapper but it doesnt help for io. Most >>>>> of modern backends will take json directly, even javax one and it must >>>>> stay >>>>> generic. >>>>> >>>>> Then since json to pojo mapping is already done a dozen of times, not >>>>> sure it is worth it for now. >>>>> >>>>> Le mar. 22 mai 2018 20:27, Reuven Lax <[email protected]> a écrit : >>>>> >>>>>> We can do even better btw. Building a SchemaRegistry where automatic >>>>>> conversions can be registered between schema and Java data types. With >>>>>> this >>>>>> the user won't even need a DoFn to do the conversion. >>>>>> >>>>>> On Tue, May 22, 2018, 10:13 AM Romain Manni-Bucau < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hi guys, >>>>>>> >>>>>>> Checked out what has been done on schema model and think it is >>>>>>> acceptable - regarding the json debate - if >>>>>>> https://issues.apache.org/jira/browse/BEAM-4381 can be fixed. >>>>>>> >>>>>>> High level, it is about providing a mainstream and not too impacting >>>>>>> model OOTB and JSON seems the most valid option for now, at least for IO >>>>>>> and some user transforms. >>>>>>> >>>>>>> Wdyt? >>>>>>> >>>>>>> Le ven. 27 avr. 2018 18:36, Romain Manni-Bucau < >>>>>>> [email protected]> a écrit : >>>>>>> >>>>>>>> Can give it a try end of may, sure. (holidays and work constraints >>>>>>>> will make it hard before). >>>>>>>> >>>>>>>> Le 27 avr. 2018 18:26, "Anton Kedin" <[email protected]> a écrit : >>>>>>>> >>>>>>>>> Romain, >>>>>>>>> >>>>>>>>> I don't believe that JSON approach was investigated very >>>>>>>>> thoroughIy. I mentioned few reasons which will make it not the best >>>>>>>>> choice >>>>>>>>> my opinion, but I may be wrong. Can you put together a design doc or a >>>>>>>>> prototype? >>>>>>>>> >>>>>>>>> Thank you, >>>>>>>>> Anton >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Apr 26, 2018 at 10:17 PM Romain Manni-Bucau < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Le 26 avr. 2018 23:13, "Anton Kedin" <[email protected]> a écrit : >>>>>>>>>> >>>>>>>>>> BeamRecord (Row) has very little in common with JsonObject (I >>>>>>>>>> assume you're talking about javax.json), except maybe some >>>>>>>>>> similarities of >>>>>>>>>> the API. Few reasons why JsonObject doesn't work: >>>>>>>>>> >>>>>>>>>> - it is a Java EE API: >>>>>>>>>> - Beam SDK is not limited to Java. There are probably >>>>>>>>>> similar APIs for other languages but they might not >>>>>>>>>> necessarily carry the >>>>>>>>>> same semantics / APIs; >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Not a big deal I think. At least not a technical blocker. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> - It can change between Java versions; >>>>>>>>>> >>>>>>>>>> No, this is javaee ;). >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> - Current Beam java implementation is an experimental feature >>>>>>>>>> to identify what's needed from such API, in the end we might >>>>>>>>>> end up with >>>>>>>>>> something similar to JsonObject API, but likely not >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I dont get that point as a blocker >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> - ; >>>>>>>>>> - represents JSON, which is not an API but an object >>>>>>>>>> notation: >>>>>>>>>> - it is defined as unicode string in a certain format. If >>>>>>>>>> you choose to adhere to ECMA-404, then it doesn't sound like >>>>>>>>>> JsonObject can >>>>>>>>>> represent an Avro object, if I'm reading it right; >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> It is in the generator impl, you can impl an avrogenerator. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> - doesn't define a type system (JSON does, but it's lacking): >>>>>>>>>> - for example, JSON doesn't define semantics for numbers; >>>>>>>>>> - doesn't define date/time types; >>>>>>>>>> - doesn't allow extending JSON type system at all; >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> That is why you need a metada object, or simpler, a schema with >>>>>>>>>> that data. Json or beam record doesnt help here and you end up on >>>>>>>>>> the same >>>>>>>>>> outcome if you think about it. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> - lacks schemas; >>>>>>>>>> >>>>>>>>>> Jsonschema are standard, widely spread and tooled compared to >>>>>>>>>> alternative. >>>>>>>>>> >>>>>>>>>> You can definitely try loosen the requirements and define >>>>>>>>>> everything in JSON in userland, but the point of Row/Schema is to >>>>>>>>>> avoid it >>>>>>>>>> and define everything in Beam model, which can be extended, mapped >>>>>>>>>> to JSON, >>>>>>>>>> Avro, BigQuery Schemas, custom binary format etc., with same >>>>>>>>>> semantics >>>>>>>>>> across beam SDKs. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> This is what jsonp would allow with the benefit of a natural pojo >>>>>>>>>> support through jsonb. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Apr 26, 2018 at 12:28 PM Romain Manni-Bucau < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Just to let it be clear and let me understand: how is BeamRecord >>>>>>>>>>> different from a JsonObject which is an API without implementation >>>>>>>>>>> (not >>>>>>>>>>> event a json one OOTB)? Advantage of json *api* are indeed natural >>>>>>>>>>> mapping >>>>>>>>>>> (jsonb is based on jsonp so no new binding to reinvent) and simple >>>>>>>>>>> serialization (json+gzip for ex, or avro if you want to be geeky). >>>>>>>>>>> >>>>>>>>>>> I fail to see the point to rebuild an ecosystem ATM. >>>>>>>>>>> >>>>>>>>>>> Le 26 avr. 2018 19:12, "Reuven Lax" <[email protected]> a écrit : >>>>>>>>>>> >>>>>>>>>>>> Exactly what JB said. We will write a generic conversion from >>>>>>>>>>>> Avro (or json) to Beam schemas, which will make them work >>>>>>>>>>>> transparently >>>>>>>>>>>> with SQL. The plan is also to migrate Anton's work so that POJOs >>>>>>>>>>>> works >>>>>>>>>>>> generically for any schema. >>>>>>>>>>>> >>>>>>>>>>>> Reuven >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Apr 26, 2018 at 1:17 AM Jean-Baptiste Onofré < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> For now we have a generic schema interface. Json-b can be an >>>>>>>>>>>>> impl, avro could be another one. >>>>>>>>>>>>> >>>>>>>>>>>>> Regards >>>>>>>>>>>>> JB >>>>>>>>>>>>> Le 26 avr. 2018, à 12:08, Romain Manni-Bucau < >>>>>>>>>>>>> [email protected]> a écrit: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hmm, >>>>>>>>>>>>>> >>>>>>>>>>>>>> avro has still the pitfalls to have an uncontrolled stack >>>>>>>>>>>>>> which brings way too much dependencies to be part of any API, >>>>>>>>>>>>>> this is why I proposed a JSON-P based API (JsonObject) with a >>>>>>>>>>>>>> custom beam entry for some metadata (headers "à la Camel"). >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Romain Manni-Bucau >>>>>>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>>>>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>>>>>>>>>>>> <http://rmannibucau.wordpress.com> | Github >>>>>>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn >>>>>>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>>>>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2018-04-26 9:59 GMT+02:00 Jean-Baptiste Onofré < >>>>>>>>>>>>>> [email protected]>: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Ismael >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> You mean directly in Beam SQL ? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> That will be part of schema support: generic record could be >>>>>>>>>>>>>>> one of the payload with across schema. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regards >>>>>>>>>>>>>>> JB >>>>>>>>>>>>>>> Le 26 avr. 2018, à 11:39, "Ismaël Mejía" < [email protected]> >>>>>>>>>>>>>>> a écrit: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hello Anton, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for the descriptive email and the really useful work. >>>>>>>>>>>>>>>> Any plans >>>>>>>>>>>>>>>> to tackle PCollections of GenericRecord/IndexedRecords? it >>>>>>>>>>>>>>>> seems Avro >>>>>>>>>>>>>>>> is a natural fit for this approach too. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>> Ismaël >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, Apr 25, 2018 at 9:04 PM, Anton Kedin >>>>>>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I want to highlight a couple of improvements to Beam SQL we >>>>>>>>>>>>>>>>> have been >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> working on recently which are targeted to make Beam SQL API >>>>>>>>>>>>>>>>> easier to use. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Specifically these features simplify conversion of Java >>>>>>>>>>>>>>>>> Beans and JSON >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> strings to Rows. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Feel free to try this and send any bugs/comments/PRs my way. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> **Caveat: this is still work in progress, and has known bugs >>>>>>>>>>>>>>>>> and incomplete >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> features, see below for details.** >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Background >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Beam SQL queries can only be applied to PCollection<Row>. >>>>>>>>>>>>>>>>> This means that >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> users need to convert whatever PCollection elements they >>>>>>>>>>>>>>>>> have to Rows before >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> querying them with SQL. This usually requires manually >>>>>>>>>>>>>>>>> creating a Schema and >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> implementing a custom conversion PTransform<PCollection< >>>>>>>>>>>>>>>>> Element>, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> PCollection<Row>> (see Beam SQL Guide). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The improvements described here are an attempt to reduce >>>>>>>>>>>>>>>>> this overhead for >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> few common cases, as a start. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Status >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Introduced a InferredRowCoder to automatically generate rows >>>>>>>>>>>>>>>>> from beans. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Removes the need to manually define a Schema and Row >>>>>>>>>>>>>>>>> conversion logic; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Introduced JsonToRow transform to automatically parse JSON >>>>>>>>>>>>>>>>> objects to Rows. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Removes the need to manually implement a conversion logic; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This is still experimental work in progress, APIs will >>>>>>>>>>>>>>>>> likely change; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> There are known bugs/unsolved problems; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Java Beans >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Introduced a coder which facilitates Rows generation from >>>>>>>>>>>>>>>>> Java Beans. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Reduces the overhead to: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> /** Some user-defined Java Bean */ >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> class JavaBeanObject implements Serializable { >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> String getName() { ... } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> // Obtain the objects: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> PCollection<JavaBeanObject> javaBeans = ...; >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> // Convert to Rows and apply a SQL query: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> PCollection<Row> queryResult = >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> javaBeans >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> .setCoder(InferredRowCoder. >>>>>>>>>>>>>>>>>> ofSerializable(JavaBeanObject. >>>>>>>>>>>>>>>>>> class)) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> .apply(BeamSql.query("SELECT name FROM PCOLLECTION")); >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Notice, there is no more manual Schema definition or custom >>>>>>>>>>>>>>>>> conversion >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> logic. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Links >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> example; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> InferredRowCoder; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> test; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> JSON >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Introduced JsonToRow transform. It is possible to query a >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> PCollection<String> that contains JSON objects like this: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> // Assuming JSON objects look like this: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> // { "type" : "foo", "size" : 333 } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> // Define a Schema: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Schema jsonSchema = >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Schema >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> .builder() >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> .addStringField("type") >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> .addInt32Field("size") >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> .build(); >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> // Obtain PCollection of the objects in JSON format: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> PCollection<String> jsonObjects = ... >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> // Convert to Rows and apply a SQL query: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> PCollection<Row> queryResults = >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> jsonObjects >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> .apply(JsonToRow.withSchema( >>>>>>>>>>>>>>>>>> jsonSchema)) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> .apply(BeamSql.query("SELECT type, AVG(size) FROM >>>>>>>>>>>>>>>>>> PCOLLECTION GROUP BY >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> type")); >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Notice, JSON to Row conversion is done by JsonToRow >>>>>>>>>>>>>>>>> transform. It is >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> currently required to supply a Schema. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Links >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> JsonToRow; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> test/example; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Going Forward >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> fix bugs (BEAM-4163, BEAM-4161 ...) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> implement more features (BEAM-4167, more types of objects); >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wire this up with sources/sinks to further simplify SQL API; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thank you, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Anton >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>
