Hello Anton,

Thanks for the descriptive email and the really useful work. Any plans
to tackle PCollections of GenericRecord/IndexedRecords? it seems Avro
is a natural fit for this approach too.

Regards,
Ismaël

On Wed, Apr 25, 2018 at 9:04 PM, Anton Kedin <ke...@google.com> wrote:
> Hi,
>
> I want to highlight a couple of improvements to Beam SQL we have been
> working on recently which are targeted to make Beam SQL API easier to use.
> Specifically these features simplify conversion of Java Beans and JSON
> strings to Rows.
>
> Feel free to try this and send any bugs/comments/PRs my way.
>
> **Caveat: this is still work in progress, and has known bugs and incomplete
> features, see below for details.**
>
> Background
>
> Beam SQL queries can only be applied to PCollection<Row>. This means that
> users need to convert whatever PCollection elements they have to Rows before
> querying them with SQL. This usually requires manually creating a Schema and
> implementing a custom conversion PTransform<PCollection<Element>,
> PCollection<Row>> (see Beam SQL Guide).
>
> The improvements described here are an attempt to reduce this overhead for
> few common cases, as a start.
>
> Status
>
> Introduced a InferredRowCoder to automatically generate rows from beans.
> Removes the need to manually define a Schema and Row conversion logic;
> Introduced JsonToRow transform to automatically parse JSON objects to Rows.
> Removes the need to manually implement a conversion logic;
> This is still experimental work in progress, APIs will likely change;
> There are known bugs/unsolved problems;
>
>
> Java Beans
>
> Introduced a coder which facilitates Rows generation from Java Beans.
> Reduces the overhead to:
>
>> /** Some user-defined Java Bean */
>> class JavaBeanObject implements Serializable {
>>     String getName() { ... }
>> }
>>
>>
>>
>> // Obtain the objects:
>> PCollection<JavaBeanObject> javaBeans = ...;
>>
>>
>>
>> // Convert to Rows and apply a SQL query:
>> PCollection<Row> queryResult =
>>   javaBeans
>>      .setCoder(InferredRowCoder.ofSerializable(JavaBeanObject.class))
>>      .apply(BeamSql.query("SELECT name FROM PCOLLECTION"));
>
>
> Notice, there is no more manual Schema definition or custom conversion
> logic.
>
> Links
>
>  example;
>  InferredRowCoder;
>  test;
>
>
> JSON
>
> Introduced JsonToRow transform. It is possible to query a
> PCollection<String> that contains JSON objects like this:
>
>> // Assuming JSON objects look like this:
>> // { "type" : "foo", "size" : 333 }
>>
>> // Define a Schema:
>> Schema jsonSchema =
>>   Schema
>>     .builder()
>>     .addStringField("type")
>>     .addInt32Field("size")
>>     .build();
>>
>> // Obtain PCollection of the objects in JSON format:
>> PCollection<String> jsonObjects = ...
>>
>> // Convert to Rows and apply a SQL query:
>> PCollection<Row> queryResults =
>>   jsonObjects
>>     .apply(JsonToRow.withSchema(jsonSchema))
>>     .apply(BeamSql.query("SELECT type, AVG(size) FROM PCOLLECTION GROUP BY
>> type"));
>
>
> Notice, JSON to Row conversion is done by JsonToRow transform. It is
> currently required to supply a Schema.
>
> Links
>
>  JsonToRow;
>  test/example;
>
>
> Going Forward
>
> fix bugs (BEAM-4163, BEAM-4161 ...)
> implement more features (BEAM-4167, more types of objects);
> wire this up with sources/sinks to further simplify SQL API;
>
>
> Thank you,
> Anton

Reply via email to