Hi, I want to highlight a couple of improvements to Beam SQL we have been working on recently which are targeted to make Beam SQL API easier to use. Specifically these features simplify conversion of Java Beans and JSON strings to Rows.
Feel free to try this and send any bugs/comments/PRs my way. ***Caveat: this is still work in progress, and has known bugs and incomplete features, see below for details.*** Background Beam SQL queries can only be applied to PCollection<Row>. This means that users need to convert whatever PCollection elements they have to Rows before querying them with SQL. This usually requires manually creating a Schema and implementing a custom conversion PTransform<PCollection<Element>, PCollection<Row>> (see Beam SQL Guide <https://beam.apache.org/documentation/dsls/sql/#row>). The improvements described here are an attempt to reduce this overhead for few common cases, as a start. Status - Introduced a InferredRowCoder to automatically generate rows from beans. Removes the need to manually define a Schema and Row conversion logic; - Introduced JsonToRow transform to automatically parse JSON objects to Rows. Removes the need to manually implement a conversion logic; - This is still experimental work in progress, APIs will likely change; - There are known bugs/unsolved problems; Java Beans Introduced a coder which facilitates Rows generation from Java Beans. Reduces the overhead to: /** Some user-defined Java Bean */ > class JavaBeanObject implements Serializable { > String getName() { ... } > } > // Obtain the objects: > PCollection<JavaBeanObject> javaBeans = ...; // Convert to Rows and apply a SQL query: > PCollection<Row> queryResult = > javaBeans > .setCoder(InferredRowCoder.ofSerializable(JavaBeanObject.class)) > .apply(BeamSql.query("SELECT name FROM PCOLLECTION")); Notice, there is no more manual Schema definition or custom conversion logic. *Links* - example <https://github.com/apache/beam/blob/5ec708e69988a967f81a9d67808af82be888f8c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlPojoExample.java#L37> ; - InferredRowCoder <https://github.com/apache/beam/blob/670c75e94795ad9da6a0690647e996dc97b60718/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/InferredRowCoder.java#L43> ; - test <https://github.com/apache/beam/blob/670c75e94795ad9da6a0690647e996dc97b60718/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/InferredRowCoderSqlTest.java#L115> ; JSON Introduced JsonToRow transform. It is possible to query a PCollection<String> that contains JSON objects like this: // Assuming JSON objects look like this: > // { "type" : "foo", "size" : 333 } > > // Define a Schema: > Schema jsonSchema = > Schema > .builder() > .addStringField("type") > .addInt32Field("size") > .build(); > > // Obtain PCollection of the objects in JSON format: > PCollection<String> jsonObjects = ... > > // Convert to Rows and apply a SQL query: > PCollection<Row> queryResults = > jsonObjects > .apply(JsonToRow.withSchema(jsonSchema)) > .apply(BeamSql.query("SELECT type, AVG(size) FROM PCOLLECTION GROUP > BY type")); Notice, JSON to Row conversion is done by JsonToRow transform. It is currently required to supply a Schema. *Links* - JsonToRow <https://github.com/apache/beam/blob/9c2b43227e1ddac39676f6c09aca1af82a9d4cdb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java#L63> ; - test/example <https://github.com/apache/beam/blob/9c2b43227e1ddac39676f6c09aca1af82a9d4cdb/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/JsonToRowSqlTest.java#L40> ; Going Forward - fix bugs (BEAM-4163 <https://issues.apache.org/jira/browse/BEAM-4163>, BEAM-4161 <https://issues.apache.org/jira/browse/BEAM-4161> ...) - implement more features (BEAM-4167 <https://issues.apache.org/jira/browse/BEAM-4167>, more types of objects); - wire this up with sources/sinks to further simplify SQL API; Thank you, Anton