We are using an Avro Schema Registry and converting these schemas to Beam Schemas with `AvroUtils.toBeamSchema`: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L314 <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L314>
It could be possible that the order is not preserved, but it’s not immediately obvious how to me. I will try to sort the fields before using the schema in the SchemaCoder. These issues are interesting, and probably a good place to get started contributing to Beam anyways. I will follow up. Thanks, Cameron > On Oct 9, 2020, at 1:47 PM, Brian Hulette <[email protected]> wrote: > > Hi Cameron, > > Thanks for bringing this up on the dev list. I'm quite familiar with Beam > schemas, but I should be clear I'm not that familiar with Dataflow's pipeline > update. +Reuven Lax <mailto:[email protected]> may need to check me there. > > > I am curious if it has been determined what makes a Schema the same as > > another schema. From what I have seen in the codebase, it changes. > > You're right, schema equality means different things in different contexts, > and we should be more clear about this. As I understand it, for pipeline > update the important thing isn't so much whether the schemas are actually > equal, but whether data encoded with the old schema can be understood by a > SchemaCoder referencing the new schema, because it's probable that the new > SchemaCoder will receive data that was encoded with the old SchemaCoder. In > order to satisfy that requirement, the old and the new schemas must have the > same fields* in the same order. > It might not seem like maintaining the ordering is an issue, but it is for > schemas inferred from Java types. That's because there's no guarantee about > the order in which we'll discover the fields or methods when using reflection > APIs. I believe Reuven did some experiments here and found that the ordering > is essentially random, so when we infer a schema from a Java type in two > different executions it can result in two completely different field orders. > > There are a couple of things we definitely need to do on the Beam side to > support pipeline update for SchemaCoder with possibly out-of-order fields: > - BEAM-10277: Java's RowCoder needs to respect the encoding_position field in > the schema proto. This provides a layer of indirection for field ordering > that runners can modify to "fix" schemas that have the same fields but out of > order. > - Java's SchemaCoder needs to encode the schema in a portable way, so that > runners will be able to inspect and modify the schema proto as described > above. Currently SchemaCoder is still represented in the pipeline proto as a > serialized Java class, so runners can't easily inspect/modify it. > > > > All that being said, it looks like you may not be using SchemaCoder with a > schema inferred from a Java type. Where is `outputSchema` coming from? Is it > possible to make sure it maintains a consistent field order? > If you can do that, this may be an easier problem. I think then we could make > a change on the Dataflow side to ignore the schema's UUID when checking for > update compatibility. > On the other hand, if you need to support pipeline update for schemas with > out-of-order fields, we'd need to address the above tasks first. If you're > willing to work on them I can help direct you, these are things I've been > hoping to work on but haven't been able to get to. > > Brian > > * Looking forward we don't actually want to require the schemas to have the > same fields, we could allow adding/removing fields with certain limitations. > > On Thu, Oct 8, 2020 at 12:55 PM Cameron Morgan <[email protected] > <mailto:[email protected]>> wrote: > Hey everyone, > > Summary: > > There is an issue with the Dataflow runner and the “Update” capability while > using the beam native Row type, which I imagine also blocks the snapshots > feature (as the docs say the snapshots have the same restrictions as the > Update feature) but I have no experience there. > > Currently when reading from KafkaIO with the valueCoder set as a SchemaCoder: > > ``` > KafkaIO.Read<ByteArray, Row>() > .withTopic(topic) > .withKeyDeserializer(ByteArrayDeserializer::class.java) > .withValueDeserializerAndCoder([Deserializer<Row>], > SchemaCoder.of(outputSchema)) > ``` > > Updates fail consistently with the error: > ``` > The original job has not been aborted., The Coder or type for step > ReadInputTopic/Read(KafkaUnboundedSource)/DataflowRunner.StreamingUnboundedRead.ReadWithIds > has changed > ``` > > There is an open issue about this, > https://issues.apache.org/jira/browse/BEAM-9502 > <https://issues.apache.org/jira/browse/BEAM-9502> but I have not seen it > discussed in the mailing list so I wanted to start it. > > Investigation so far: > > This failing on Beam 2.20 and below makes sense, as before the code path that > called equals on this Coder first checked that the schema’s were equal (This > part has not changed): > https://github.com/apache/beam/blob/release-2.25.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java#L194 > > <https://github.com/apache/beam/blob/release-2.25.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java#L194>. > Then this called the equals on the schema here, which if UUIDs were different > caused false to be returned: > https://github.com/apache/beam/blob/release-2.20.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L272 > > <https://github.com/apache/beam/blob/release-2.20.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L272> > > This means that as the above issue suggests, the UUID being random meant that > no two SchemaCoders were ever the same, causing the equals to return false. > > > > In [BEAM-4076] this was changed (PR link: > https://github.com/apache/beam/pull/11041/files > <https://github.com/apache/beam/pull/11041/files>, direct link: > https://github.com/reuvenlax/incubator-beam/blob/37b1fbaa7ea1b64303eaacae9e670797a92b1a50/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L272 > > <https://github.com/reuvenlax/incubator-beam/blob/37b1fbaa7ea1b64303eaacae9e670797a92b1a50/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L272> > ) > So now, it appears that if the UUIDs don’t match, it still checks the > fieldIndices, fields and fieldOptions. Reasonably, this should mean that we > don’t get the incompatibility error, but is potentially slower than would be > ideal. > > But we are still getting the same error on Beam 2.24. > > I added a way to deterministically generate a UUID which we do on the schema > we use in `.withValueDeserializerAndCoder([Deserializer<Row>], > SchemaCoder.of(outputSchema))`. > > When we do not set the Schema UUID it fails with the same error every time so > it is difficult to log the SchemaCoder. > > When we do set the schema UUID deterministically, it fails sometimes and > other times succeeds. When it does succeed, logging the coder, the fields are > the same in the same order with the same UUID on the schema. When I launch a > new job, the logging prints the same coder as in iteration 1, stumping me on > why the update failed at all. I need some advice here. > > Follow up discussion: > > I am curious if it has been determined what makes a Schema the same as > another schema. From what I have seen in the codebase, it changes. > > In this PR: https://github.com/apache/beam/pull/13049 > <https://github.com/apache/beam/pull/13049> I have generated a Schema UUID > based on the the field names, typeNames, and nullability. Let me know if this > is the right direction. > > I think this UUID is worth making deterministic even if it doesn’t solve the > Dataflow update issues, so that comparison is faster. > > There was a previous draft PR but I got busy and didn’t finish: > https://github.com/apache/beam/pull/11447#issuecomment-615442205 > <https://github.com/apache/beam/pull/11447#issuecomment-615442205>, which had > the reply: > ``` > I would like to talk about the use case a bit more. I think this is probably > to do with update support on the Dataflow runner, in which case this may not > be the right solution (and may not be a sufficient change). > ``` > > Follow up questions that may have overlap: > This is the only issue related to Dataflow updates with the Row type I could > see, but are there more internally, and if so, can they be made public so > that we can work on them? > I know Row type is “Experimental” but I have only seen discussions that > DATETIME should be removed before removing that tag. What other issues and > blockers are required for this to happen, if there are any? > Is this the totally wrong approach/wrong place for this discussion? > Should SchemaOptions and field descriptions be included to create a UUID? I > don’t think they should, so that descriptions and options can be updated > (just my naive opinion, to be changed). > Perhaps there are greater plans for schemas. Can this (or something similar) > merge as a stop gap? > > This is my first message to the mailing list, so I apologize if any info is > missing. > > Thanks, > Cameron > > > > > > > >
