We are using an Avro Schema Registry and converting these schemas to Beam 
Schemas with `AvroUtils.toBeamSchema`: 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L314
 
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L314>

It could be possible that the order is not preserved, but it’s not immediately 
obvious how to me. I will try to sort the fields before using the schema in the 
SchemaCoder.

These issues are interesting, and probably a good place to get started 
contributing to Beam anyways. I will follow up.

Thanks,
Cameron

> On Oct 9, 2020, at 1:47 PM, Brian Hulette <[email protected]> wrote:
> 
> Hi Cameron,
> 
> Thanks for bringing this up on the dev list. I'm quite familiar with Beam 
> schemas, but I should be clear I'm not that familiar with Dataflow's pipeline 
> update. +Reuven Lax <mailto:[email protected]> may need to check me there.
> 
> > I am curious if it has been determined what makes a Schema the same as 
> > another schema. From what I have seen in the codebase, it changes.
> 
> You're right, schema equality means different things in different contexts, 
> and we should be more clear about this. As I understand it, for pipeline 
> update the important thing isn't so much whether the schemas are actually 
> equal, but whether data encoded with the old schema can be understood by a 
> SchemaCoder referencing the new schema, because it's probable that the new 
> SchemaCoder will receive data that was encoded with the old SchemaCoder. In 
> order to satisfy that requirement, the old and the new schemas must have the 
> same fields* in the same order.
> It might not seem like maintaining the ordering is an issue, but it is for 
> schemas inferred from Java types. That's because there's no guarantee about 
> the order in which we'll discover the fields or methods when using reflection 
> APIs. I believe Reuven did some experiments here and found that the ordering 
> is essentially random, so when we infer a schema from a Java type in two 
> different executions it can result in two completely different field orders.
> 
> There are a couple of things we definitely need to do on the Beam side to 
> support pipeline update for SchemaCoder with possibly out-of-order fields:
> - BEAM-10277: Java's RowCoder needs to respect the encoding_position field in 
> the schema proto. This provides a layer of indirection for field ordering 
> that runners can modify to "fix" schemas that have the same fields but out of 
> order.
> - Java's SchemaCoder needs to encode the schema in a portable way, so that 
> runners will be able to inspect and modify the schema proto as described 
> above. Currently SchemaCoder is still represented in the pipeline proto as a 
> serialized Java class, so runners can't easily inspect/modify it.
> 
> 
> 
> All that being said, it looks like you may not be using SchemaCoder with a 
> schema inferred from a Java type. Where is `outputSchema` coming from? Is it 
> possible to make sure it maintains a consistent field order?
> If you can do that, this may be an easier problem. I think then we could make 
> a change on the Dataflow side to ignore the schema's UUID when checking for 
> update compatibility.
> On the other hand, if you need to support pipeline update for schemas with 
> out-of-order fields, we'd need to address the above tasks first. If you're 
> willing to work on them I can help direct you, these are things I've been 
> hoping to work on but haven't been able to get to.
> 
> Brian
> 
> * Looking forward we don't actually want to require the schemas to have the 
> same fields, we could allow adding/removing fields with certain limitations.
> 
> On Thu, Oct 8, 2020 at 12:55 PM Cameron Morgan <[email protected] 
> <mailto:[email protected]>> wrote:
> Hey everyone,
> 
> Summary: 
> 
> There is an issue with the Dataflow runner and the “Update” capability while 
> using the beam native Row type, which I imagine also blocks the snapshots 
> feature (as the docs say the snapshots have the same restrictions as the 
> Update feature) but I have no experience there.
> 
> Currently when reading from KafkaIO with the valueCoder set as a SchemaCoder:
> 
> ```
> KafkaIO.Read<ByteArray, Row>()
>     .withTopic(topic)
>     .withKeyDeserializer(ByteArrayDeserializer::class.java)
>     .withValueDeserializerAndCoder([Deserializer<Row>], 
> SchemaCoder.of(outputSchema))
> ```
> 
> Updates fail consistently with the error:
> ```
> The original job has not been aborted., The Coder or type for step 
> ReadInputTopic/Read(KafkaUnboundedSource)/DataflowRunner.StreamingUnboundedRead.ReadWithIds
>  has changed
> ```
> 
> There is an open issue about this, 
> https://issues.apache.org/jira/browse/BEAM-9502 
> <https://issues.apache.org/jira/browse/BEAM-9502> but I have not seen it 
> discussed in the mailing list so I wanted to start it. 
> 
> Investigation so far: 
> 
> This failing on Beam 2.20 and below makes sense, as before the code path that 
> called equals on this Coder first checked that the schema’s were equal (This 
> part has not changed): 
> https://github.com/apache/beam/blob/release-2.25.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java#L194
>  
> <https://github.com/apache/beam/blob/release-2.25.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java#L194>.
> Then this called the equals on the schema here, which if UUIDs were different 
> caused false to be returned: 
> https://github.com/apache/beam/blob/release-2.20.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L272
>  
> <https://github.com/apache/beam/blob/release-2.20.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L272>
> 
> This means that as the above issue suggests, the UUID being random meant that 
> no two SchemaCoders were ever the same, causing the equals to return false.
> 
> 
> 
> In [BEAM-4076] this was changed (PR link: 
> https://github.com/apache/beam/pull/11041/files 
> <https://github.com/apache/beam/pull/11041/files>, direct link: 
> https://github.com/reuvenlax/incubator-beam/blob/37b1fbaa7ea1b64303eaacae9e670797a92b1a50/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L272
>  
> <https://github.com/reuvenlax/incubator-beam/blob/37b1fbaa7ea1b64303eaacae9e670797a92b1a50/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L272>
>  )
> So now, it appears that if the UUIDs don’t match, it still checks the 
> fieldIndices, fields and fieldOptions. Reasonably, this should mean that we 
> don’t get the incompatibility error, but is potentially slower than would be 
> ideal.
> 
> But we are still getting the same error on Beam 2.24.
> 
> I added a way to deterministically generate a UUID which we do on the schema 
> we use in `.withValueDeserializerAndCoder([Deserializer<Row>], 
> SchemaCoder.of(outputSchema))`.
> 
> When we do not set the Schema UUID it fails with the same error every time so 
> it is difficult to log the SchemaCoder.
> 
> When we do set the schema UUID deterministically, it fails sometimes and 
> other times succeeds. When it does succeed, logging the coder, the fields are 
> the same in the same order with the same UUID on the schema. When I launch a 
> new job, the logging prints the same coder as in iteration 1, stumping me on 
> why the update failed at all. I need some advice here.
> 
> Follow up discussion:
> 
> I am curious if it has been determined what makes a Schema the same as 
> another schema. From what I have seen in the codebase, it changes.
> 
> In this PR: https://github.com/apache/beam/pull/13049 
> <https://github.com/apache/beam/pull/13049> I have generated a Schema UUID 
> based on the the field names, typeNames, and nullability. Let me know if this 
> is the right direction.
> 
> I think this UUID is worth making deterministic even if it doesn’t solve the 
> Dataflow update issues, so that comparison is faster. 
> 
> There was a previous draft PR but I got busy and didn’t finish: 
> https://github.com/apache/beam/pull/11447#issuecomment-615442205 
> <https://github.com/apache/beam/pull/11447#issuecomment-615442205>, which had 
> the reply:
> ```
> I would like to talk about the use case a bit more. I think this is probably 
> to do with update support on the Dataflow runner, in which case this may not 
> be the right solution (and may not be a sufficient change).
> ```
> 
> Follow up questions that may have overlap:
> This is the only issue related to Dataflow updates with the Row type I could 
> see, but are there more internally, and if so, can they be made public so 
> that we can work on them?
> I know Row type is “Experimental” but I have only seen discussions that 
> DATETIME should be removed before removing that tag. What other issues and 
> blockers are required for this to happen, if there are any?
> Is this the totally wrong approach/wrong place for this discussion?
> Should SchemaOptions and field descriptions be included to create a UUID? I 
> don’t think they should, so that descriptions and options can be updated 
> (just my naive opinion, to be changed).
> Perhaps there are greater plans for schemas. Can this (or something similar) 
> merge as a stop gap?
> 
> This is my first message to the mailing list, so I apologize if any info is 
> missing.
> 
> Thanks,
> Cameron 
> 
> 
> 
> 
> 
> 
> 
> 

Reply via email to