Hi,

I want to highlight a couple of improvements to Beam SQL we have been
working on recently which are targeted to make Beam SQL API easier to use.
Specifically these features simplify conversion of Java Beans and JSON
strings to Rows.

Feel free to try this and send any bugs/comments/PRs my way.

***Caveat: this is still work in progress, and has known bugs and
incomplete features, see below for details.***

Background

Beam SQL queries can only be applied to PCollection<Row>. This means that
users need to convert whatever PCollection elements they have to Rows
before querying them with SQL. This usually requires manually creating a
Schema and implementing a custom conversion PTransform<PCollection<Element>,
PCollection<Row>> (see Beam SQL Guide
<https://beam.apache.org/documentation/dsls/sql/#row>).

The improvements described here are an attempt to reduce this overhead for
few common cases, as a start.

Status

   - Introduced a InferredRowCoder to automatically generate rows from
   beans. Removes the need to manually define a Schema and Row conversion
   logic;
   - Introduced JsonToRow transform to automatically parse JSON objects to
   Rows. Removes the need to manually implement a conversion logic;
   - This is still experimental work in progress, APIs will likely change;
   - There are known bugs/unsolved problems;


Java Beans

Introduced a coder which facilitates Rows generation from Java Beans.
Reduces the overhead to:

/** Some user-defined Java Bean */
> class JavaBeanObject implements Serializable {
>     String getName() { ... }
> }
>


// Obtain the objects:
> PCollection<JavaBeanObject> javaBeans = ...;



// Convert to Rows and apply a SQL query:
> PCollection<Row> queryResult =
>   javaBeans
>      .setCoder(InferredRowCoder.ofSerializable(JavaBeanObject.class))
>      .apply(BeamSql.query("SELECT name FROM PCOLLECTION"));


Notice, there is no more manual Schema definition or custom conversion
logic.

*Links*

   -  example
   
<https://github.com/apache/beam/blob/5ec708e69988a967f81a9d67808af82be888f8c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlPojoExample.java#L37>
   ;
   -  InferredRowCoder
   
<https://github.com/apache/beam/blob/670c75e94795ad9da6a0690647e996dc97b60718/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/InferredRowCoder.java#L43>
   ;
   -  test
   
<https://github.com/apache/beam/blob/670c75e94795ad9da6a0690647e996dc97b60718/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/InferredRowCoderSqlTest.java#L115>
   ;


JSON

Introduced JsonToRow transform. It is possible to query a
PCollection<String> that contains JSON objects like this:

// Assuming JSON objects look like this:
> // { "type" : "foo", "size" : 333 }
>
> // Define a Schema:
> Schema jsonSchema =
>   Schema
>     .builder()
>     .addStringField("type")
>     .addInt32Field("size")
>     .build();
>
> // Obtain PCollection of the objects in JSON format:
> PCollection<String> jsonObjects = ...
>
> // Convert to Rows and apply a SQL query:
> PCollection<Row> queryResults =
>   jsonObjects
>     .apply(JsonToRow.withSchema(jsonSchema))
>     .apply(BeamSql.query("SELECT type, AVG(size) FROM PCOLLECTION GROUP
> BY type"));


Notice, JSON to Row conversion is done by JsonToRow transform. It is
currently required to supply a Schema.

*Links*

   -  JsonToRow
   
<https://github.com/apache/beam/blob/9c2b43227e1ddac39676f6c09aca1af82a9d4cdb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java#L63>
   ;
   -  test/example
   
<https://github.com/apache/beam/blob/9c2b43227e1ddac39676f6c09aca1af82a9d4cdb/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/JsonToRowSqlTest.java#L40>
   ;


Going Forward

   - fix bugs (BEAM-4163 <https://issues.apache.org/jira/browse/BEAM-4163>,
   BEAM-4161 <https://issues.apache.org/jira/browse/BEAM-4161> ...)
   - implement more features (BEAM-4167
   <https://issues.apache.org/jira/browse/BEAM-4167>, more types of
   objects);
   - wire this up with sources/sinks to further simplify SQL API;


Thank you,
Anton

Reply via email to