The metadata needed is already there - it's the encoding-position map in
Schema. However the code needs to be written to examine an old schema and a
new one in order to make the new schema encoding-compatible with the old
one. This shouldn't be difficult to write.

On Fri, Jul 26, 2019 at 10:21 AM Kenneth Knowles <[email protected]> wrote:

> The most challenging part, as I understand it, surrounds automatically
> inferred schemas from POJOs, where Java's nondeterministic iteration order,
> combined with a row's inherent ordering, means that even an identical
> pipeline will need some metadata to plumb the right fields to the right
> column indices.
>
> Most relational migration management I've done incorporates explicit
> migration logic along with changes to the schema. This is quite a lot more
> robust, but more implementation work, than having a default policy
> proto/avro/thrift style. I think there's a lot to explore here.
>
> Kenn
>
> On Thu, Jul 25, 2019 at 9:59 AM Brian Hulette <[email protected]> wrote:
>
>> I know Reuven has put some thought into evolving schemas, but I'm not
>> sure it's documented anywhere as of now. The only documentation I've come
>> across as I bump around the schema code are some comments deep in RowCoder
>> [1].
>> Essentially the current serialization format for a row includes a row
>> count as a prefix so we can detect "simple" schema changes like column
>> additions and deletions. When decoding a Row, if the current schema
>> contains *more* fields than the encoded Row, the remaining fields are
>> populated with nulls in the resulting Row object. If the current schema
>> contains *fewer* fields than the encoded Row, the additional ones are
>> just dropped.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java#L296
>>
>> On Wed, Jul 24, 2019 at 6:00 AM Ryan Skraba <[email protected]> wrote:
>>
>>> I'm also really interested in the question of evolving schemas... It's
>>> something I've also put off figuring out :D
>>>
>>> With all its warts, the LazyAvroCoder technique (a coder backed by
>>> some sort of schema registry) _could_ work with "homogeneish" data
>>> (i.e. if the number of schemas in play for a single coder is much,
>>> much smaller than the number of elements), even if none of the the
>>> schemas are known at Pipeline construction.  The portability job
>>> server (which already stores and serves artifacts for running jobs)
>>> might be the right place to put a schema registry... but I'm not
>>> entirely convinced it's the right way to go either.
>>>
>>> At the same time, "simply" bumping a known schema to a new version is
>>> roughly equivalent to updating a pipeline in place.
>>>
>>> Sending the data as Java-serialized Rows will be equivalent to sending
>>> the entire schema with every record, so it _would_ work without
>>> involving a new, distributed state between one coders encode and
>>> anothers decode (at the cost of message size, of course).
>>>
>>> Ryan
>>>
>>>
>>> On Wed, Jul 24, 2019 at 1:40 AM Pablo Estrada <[email protected]>
>>> wrote:
>>> >
>>> > +dev
>>> > Thanks Ryan! This is quite helpful. Still not what I need : ) - but
>>> useful.
>>> >
>>> > The data is change data capture from databases, and I'm putting it
>>> into a Beam Row. The schema for the Row is generally homogeneous, but
>>> subject to change at some point in the future if the schema in the database
>>> changes. It's unusual and unlikely, but possible. I have no idea how Beam
>>> deals with evolving schemas. +Reuven Lax is there documentation / examples
>>> / anything around this? : )
>>> >
>>> > I think evolving schemas is an interesting question....
>>> >
>>> > For now, I am going to Java-serialize the objects, and delay figuring
>>> this out. But I reckon I'll have to come back to this...
>>> >
>>> > Best
>>> > -P.
>>> >
>>> > On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba <[email protected]> wrote:
>>> >>
>>> >> Hello Pablo!  Just to clarify -- the Row schemas aren't known at
>>> >> pipeline construction time, but can be discovered from the instance of
>>> >> MyData?
>>> >>
>>> >> Once discovered, is the schema "homogeneous" for all instance of
>>> >> MyData?  (i.e. someRow will always have the same schema for all
>>> >> instances afterwards, and there won't be another someRow with a
>>> >> different schema).
>>> >>
>>> >> We've encountered a parallel "problem" with pure Avro data, where the
>>> >> instance is a GenericRecord containing it's own Avro schema but
>>> >> *without* knowing the schema until the pipeline is run.  The solution
>>> >> that we've been using is a bit hacky, but we're using an ad hoc
>>> >> per-job schema registry and a custom coder where each worker saves the
>>> >> schema in the `encode` before writing the record, and loads it lazily
>>> >> in the `decode` before reading.
>>> >>
>>> >> The original code is available[1] (be gentle, it was written with Beam
>>> >> 0.4.0-incubating... and has continued to work until now).
>>> >>
>>> >> In practice, the ad hoc schema registry is just a server socket in the
>>> >> Spark driver, in-memory for DirectRunner / local mode, and a a
>>> >> read/write to a known location in other runners.  There are definitely
>>> >> other solutions with side-inputs and providers, and the job server in
>>> >> portability looks like an exciting candidate for per-job schema
>>> >> registry story...
>>> >>
>>> >> I'm super eager to see if there are other ideas or a contribution we
>>> >> can make in this area that's "Beam Row" oriented!
>>> >>
>>> >> Ryan
>>> >>
>>> >> [1]
>>> https://github.com/Talend/components/blob/master/core/components-adapter-beam/src/main/java/org/talend/components/adapter/beam/coders/LazyAvroCoder.java
>>> >>
>>> >> On Tue, Jul 23, 2019 at 12:49 AM Pablo Estrada <[email protected]>
>>> wrote:
>>> >> >
>>> >> > Hello all,
>>> >> > I am writing a utility to push data to PubSub. My data class looks
>>> something like so:
>>> >> > ==========
>>> >> > class MyData {
>>> >> >   String someId;
>>> >> >   Row someRow;
>>> >> >   Row someOtherRow;
>>> >> > }
>>> >> > ==============
>>> >> > The schema for the Rows is not known a-priori. It is contained by
>>> the Row. I am then pushing this data to pubsub:
>>> >> > ===========
>>> >> > MyData pushingData = ....
>>> >> > WhatCoder? coder = ....
>>> >> >
>>> >> > ByteArrayOutputStream os = new ByteArrayOutputStream();
>>> >> > coder.encode(this, os);
>>> >> >
>>> >> > pubsubClient.connect();
>>> >> >
>>> pubsubClient.push(PubSubMessage.newBuilder().setData(os.toByteArray()).build());
>>> >> > pubsubClient.close();
>>> >> > =================
>>> >> > What's the right coder to use in this case? I don't know if
>>> SchemaCoder will work, because it seems that it requires the Row's schema a
>>> priori. I have not been able to make AvroCoder work.
>>> >> >
>>> >> > Any tips?
>>> >> > Best
>>> >> > -P.
>>>
>>

Reply via email to