Beam Java's support for schemas is just about done: we infer schemas from a
variety of types, we have a variety of utility transforms (join, aggregate,
etc.) for schemas, and schemas are integrated with the ParDo machinery. The
big remaining task I'm working on is writing documentation and examples for
all of this so that users are aware. If you're interested, these slides
<https://docs.google.com/presentation/d/1kjgmbG2OkVldUM_aSHgho_C3rCftz_v66iBHSUb08P0/edit?usp=sharing>
from
the London Beam meetup show a bit more how schemas can be used and how they
simplify the API.

I want to start integrating schemas into portability so that they can be
used from other languages such as Python (in particular this will also
allow BeamSQL to be invoked from other languages). In order to do this, the
Beam portability protos must have a way of representing schemas. Since this
has not been discussed before, I'm starting this discussion now on the list.

As a reminder: a schema represents the type of a PCollection as a
collection of fields. Each field has a name, an id (position), and a field
type. A field type can be either a primitive type (int, long, string, byte
array, etc.), a nested row (itself with a schema), an array, or a map.

We also support logical types. A logical type is a way for the user to
embed their own types in schema fields. A logical type is always backed by
a schema type, and contains a function for mapping the user's logical type
to the field type. You can think of this as a generalization of a coder:
while a coder always maps the user type to a byte array, a logical type can
map to an int, or a string, or any other schema field type (in fact any
coder can always be used as a logical type for mapping to byte-array field
types). Logical types are used extensively by Beam SQL to represent SQL
types that have no correspondence in Beam's field types (e.g. SQL has 4
different date/time types). Logical types for Beam schemas have a lot of
similarities to AVRO logical types.

An initial proto representation for schemas is here
<https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L654>.
Before we go further with this, I would like community consensus on what
this representation should be. I can start by suggesting a few possible
changes to this representation (and hopefully others will suggest others):

   - Kenn Knowles has suggested removing DATETIME as a primitive type, and
   instead making it a logical type backed by INT64 as this keeps our
   primitive types closer to "classical" PL primitive types. This also allows
   us to create multiple versions of this type - e.g. TIMESTAMP(millis),
   TIMESTAMP(micros), TIMESTAMP(nanos).
   - If we do the above, we can also consider removing DECIMAL and making
   that a logical type as well.
   - The id field is currently used for some performance optimizations
   only. If we formalized the idea of schema types having ids, then we might
   be able to use this to allow self-recursive schemas (self-recursive types
   are not currently allowed).
   - Beam Schemas currently have an ARRAY type. However Beam supports
   "large iterables" (iterables that don't fit in memory that the runner can
   page in), and this doesn't match well to arrays. I think we need to add an
   ITERABLE type as well to support things like GroupByKey results.

It would also be interesting to explore allowing well-known metadata tags
on fields that Beam interprets. e.g. key and value, to allow Beam to
interpret any two-field schema as a KV, or window and timestamp to allow
automatically filling those out. However this would be an extension to the
current schema concept and deserves a separate discussion thread IMO.

I ask that we please limit this discussion to the proto representation of
schemas. If people want to discuss (or rediscuss) other things around Beam
schemas, I'll be happy to create separate threads for those discussions.

Thank you!

Reuven

Reply via email to