Hello Anton, Thanks for the descriptive email and the really useful work. Any plans to tackle PCollections of GenericRecord/IndexedRecords? it seems Avro is a natural fit for this approach too.
Regards, Ismaël On Wed, Apr 25, 2018 at 9:04 PM, Anton Kedin <ke...@google.com> wrote: > Hi, > > I want to highlight a couple of improvements to Beam SQL we have been > working on recently which are targeted to make Beam SQL API easier to use. > Specifically these features simplify conversion of Java Beans and JSON > strings to Rows. > > Feel free to try this and send any bugs/comments/PRs my way. > > **Caveat: this is still work in progress, and has known bugs and incomplete > features, see below for details.** > > Background > > Beam SQL queries can only be applied to PCollection<Row>. This means that > users need to convert whatever PCollection elements they have to Rows before > querying them with SQL. This usually requires manually creating a Schema and > implementing a custom conversion PTransform<PCollection<Element>, > PCollection<Row>> (see Beam SQL Guide). > > The improvements described here are an attempt to reduce this overhead for > few common cases, as a start. > > Status > > Introduced a InferredRowCoder to automatically generate rows from beans. > Removes the need to manually define a Schema and Row conversion logic; > Introduced JsonToRow transform to automatically parse JSON objects to Rows. > Removes the need to manually implement a conversion logic; > This is still experimental work in progress, APIs will likely change; > There are known bugs/unsolved problems; > > > Java Beans > > Introduced a coder which facilitates Rows generation from Java Beans. > Reduces the overhead to: > >> /** Some user-defined Java Bean */ >> class JavaBeanObject implements Serializable { >> String getName() { ... } >> } >> >> >> >> // Obtain the objects: >> PCollection<JavaBeanObject> javaBeans = ...; >> >> >> >> // Convert to Rows and apply a SQL query: >> PCollection<Row> queryResult = >> javaBeans >> .setCoder(InferredRowCoder.ofSerializable(JavaBeanObject.class)) >> .apply(BeamSql.query("SELECT name FROM PCOLLECTION")); > > > Notice, there is no more manual Schema definition or custom conversion > logic. > > Links > > example; > InferredRowCoder; > test; > > > JSON > > Introduced JsonToRow transform. It is possible to query a > PCollection<String> that contains JSON objects like this: > >> // Assuming JSON objects look like this: >> // { "type" : "foo", "size" : 333 } >> >> // Define a Schema: >> Schema jsonSchema = >> Schema >> .builder() >> .addStringField("type") >> .addInt32Field("size") >> .build(); >> >> // Obtain PCollection of the objects in JSON format: >> PCollection<String> jsonObjects = ... >> >> // Convert to Rows and apply a SQL query: >> PCollection<Row> queryResults = >> jsonObjects >> .apply(JsonToRow.withSchema(jsonSchema)) >> .apply(BeamSql.query("SELECT type, AVG(size) FROM PCOLLECTION GROUP BY >> type")); > > > Notice, JSON to Row conversion is done by JsonToRow transform. It is > currently required to supply a Schema. > > Links > > JsonToRow; > test/example; > > > Going Forward > > fix bugs (BEAM-4163, BEAM-4161 ...) > implement more features (BEAM-4167, more types of objects); > wire this up with sources/sinks to further simplify SQL API; > > > Thank you, > Anton