The metadata needed is already there - it's the encoding-position map in Schema. However the code needs to be written to examine an old schema and a new one in order to make the new schema encoding-compatible with the old one. This shouldn't be difficult to write.
On Fri, Jul 26, 2019 at 10:21 AM Kenneth Knowles <[email protected]> wrote: > The most challenging part, as I understand it, surrounds automatically > inferred schemas from POJOs, where Java's nondeterministic iteration order, > combined with a row's inherent ordering, means that even an identical > pipeline will need some metadata to plumb the right fields to the right > column indices. > > Most relational migration management I've done incorporates explicit > migration logic along with changes to the schema. This is quite a lot more > robust, but more implementation work, than having a default policy > proto/avro/thrift style. I think there's a lot to explore here. > > Kenn > > On Thu, Jul 25, 2019 at 9:59 AM Brian Hulette <[email protected]> wrote: > >> I know Reuven has put some thought into evolving schemas, but I'm not >> sure it's documented anywhere as of now. The only documentation I've come >> across as I bump around the schema code are some comments deep in RowCoder >> [1]. >> Essentially the current serialization format for a row includes a row >> count as a prefix so we can detect "simple" schema changes like column >> additions and deletions. When decoding a Row, if the current schema >> contains *more* fields than the encoded Row, the remaining fields are >> populated with nulls in the resulting Row object. If the current schema >> contains *fewer* fields than the encoded Row, the additional ones are >> just dropped. >> >> [1] >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java#L296 >> >> On Wed, Jul 24, 2019 at 6:00 AM Ryan Skraba <[email protected]> wrote: >> >>> I'm also really interested in the question of evolving schemas... It's >>> something I've also put off figuring out :D >>> >>> With all its warts, the LazyAvroCoder technique (a coder backed by >>> some sort of schema registry) _could_ work with "homogeneish" data >>> (i.e. if the number of schemas in play for a single coder is much, >>> much smaller than the number of elements), even if none of the the >>> schemas are known at Pipeline construction. The portability job >>> server (which already stores and serves artifacts for running jobs) >>> might be the right place to put a schema registry... but I'm not >>> entirely convinced it's the right way to go either. >>> >>> At the same time, "simply" bumping a known schema to a new version is >>> roughly equivalent to updating a pipeline in place. >>> >>> Sending the data as Java-serialized Rows will be equivalent to sending >>> the entire schema with every record, so it _would_ work without >>> involving a new, distributed state between one coders encode and >>> anothers decode (at the cost of message size, of course). >>> >>> Ryan >>> >>> >>> On Wed, Jul 24, 2019 at 1:40 AM Pablo Estrada <[email protected]> >>> wrote: >>> > >>> > +dev >>> > Thanks Ryan! This is quite helpful. Still not what I need : ) - but >>> useful. >>> > >>> > The data is change data capture from databases, and I'm putting it >>> into a Beam Row. The schema for the Row is generally homogeneous, but >>> subject to change at some point in the future if the schema in the database >>> changes. It's unusual and unlikely, but possible. I have no idea how Beam >>> deals with evolving schemas. +Reuven Lax is there documentation / examples >>> / anything around this? : ) >>> > >>> > I think evolving schemas is an interesting question.... >>> > >>> > For now, I am going to Java-serialize the objects, and delay figuring >>> this out. But I reckon I'll have to come back to this... >>> > >>> > Best >>> > -P. >>> > >>> > On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba <[email protected]> wrote: >>> >> >>> >> Hello Pablo! Just to clarify -- the Row schemas aren't known at >>> >> pipeline construction time, but can be discovered from the instance of >>> >> MyData? >>> >> >>> >> Once discovered, is the schema "homogeneous" for all instance of >>> >> MyData? (i.e. someRow will always have the same schema for all >>> >> instances afterwards, and there won't be another someRow with a >>> >> different schema). >>> >> >>> >> We've encountered a parallel "problem" with pure Avro data, where the >>> >> instance is a GenericRecord containing it's own Avro schema but >>> >> *without* knowing the schema until the pipeline is run. The solution >>> >> that we've been using is a bit hacky, but we're using an ad hoc >>> >> per-job schema registry and a custom coder where each worker saves the >>> >> schema in the `encode` before writing the record, and loads it lazily >>> >> in the `decode` before reading. >>> >> >>> >> The original code is available[1] (be gentle, it was written with Beam >>> >> 0.4.0-incubating... and has continued to work until now). >>> >> >>> >> In practice, the ad hoc schema registry is just a server socket in the >>> >> Spark driver, in-memory for DirectRunner / local mode, and a a >>> >> read/write to a known location in other runners. There are definitely >>> >> other solutions with side-inputs and providers, and the job server in >>> >> portability looks like an exciting candidate for per-job schema >>> >> registry story... >>> >> >>> >> I'm super eager to see if there are other ideas or a contribution we >>> >> can make in this area that's "Beam Row" oriented! >>> >> >>> >> Ryan >>> >> >>> >> [1] >>> https://github.com/Talend/components/blob/master/core/components-adapter-beam/src/main/java/org/talend/components/adapter/beam/coders/LazyAvroCoder.java >>> >> >>> >> On Tue, Jul 23, 2019 at 12:49 AM Pablo Estrada <[email protected]> >>> wrote: >>> >> > >>> >> > Hello all, >>> >> > I am writing a utility to push data to PubSub. My data class looks >>> something like so: >>> >> > ========== >>> >> > class MyData { >>> >> > String someId; >>> >> > Row someRow; >>> >> > Row someOtherRow; >>> >> > } >>> >> > ============== >>> >> > The schema for the Rows is not known a-priori. It is contained by >>> the Row. I am then pushing this data to pubsub: >>> >> > =========== >>> >> > MyData pushingData = .... >>> >> > WhatCoder? coder = .... >>> >> > >>> >> > ByteArrayOutputStream os = new ByteArrayOutputStream(); >>> >> > coder.encode(this, os); >>> >> > >>> >> > pubsubClient.connect(); >>> >> > >>> pubsubClient.push(PubSubMessage.newBuilder().setData(os.toByteArray()).build()); >>> >> > pubsubClient.close(); >>> >> > ================= >>> >> > What's the right coder to use in this case? I don't know if >>> SchemaCoder will work, because it seems that it requires the Row's schema a >>> priori. I have not been able to make AvroCoder work. >>> >> > >>> >> > Any tips? >>> >> > Best >>> >> > -P. >>> >>
