Hey everyone,
Summary:
There is an issue with the Dataflow runner and the “Update” capability while
using the beam native Row type, which I imagine also blocks the snapshots
feature (as the docs say the snapshots have the same restrictions as the Update
feature) but I have no experience there.
Currently when reading from KafkaIO with the valueCoder set as a SchemaCoder:
```
KafkaIO.Read<ByteArray, Row>()
.withTopic(topic)
.withKeyDeserializer(ByteArrayDeserializer::class.java)
.withValueDeserializerAndCoder([Deserializer<Row>],
SchemaCoder.of(outputSchema))
```
Updates fail consistently with the error:
```
The original job has not been aborted., The Coder or type for step
ReadInputTopic/Read(KafkaUnboundedSource)/DataflowRunner.StreamingUnboundedRead.ReadWithIds
has changed
```
There is an open issue about this,
https://issues.apache.org/jira/browse/BEAM-9502
<https://issues.apache.org/jira/browse/BEAM-9502> but I have not seen it
discussed in the mailing list so I wanted to start it.
Investigation so far:
This failing on Beam 2.20 and below makes sense, as before the code path that
called equals on this Coder first checked that the schema’s were equal (This
part has not changed):
https://github.com/apache/beam/blob/release-2.25.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java#L194
<https://github.com/apache/beam/blob/release-2.25.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java#L194>.
Then this called the equals on the schema here, which if UUIDs were different
caused false to be returned:
https://github.com/apache/beam/blob/release-2.20.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L272
<https://github.com/apache/beam/blob/release-2.20.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L272>
This means that as the above issue suggests, the UUID being random meant that
no two SchemaCoders were ever the same, causing the equals to return false.
In [BEAM-4076] this was changed (PR link:
https://github.com/apache/beam/pull/11041/files
<https://github.com/apache/beam/pull/11041/files>, direct link:
https://github.com/reuvenlax/incubator-beam/blob/37b1fbaa7ea1b64303eaacae9e670797a92b1a50/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L272
<https://github.com/reuvenlax/incubator-beam/blob/37b1fbaa7ea1b64303eaacae9e670797a92b1a50/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L272>
)
So now, it appears that if the UUIDs don’t match, it still checks the
fieldIndices, fields and fieldOptions. Reasonably, this should mean that we
don’t get the incompatibility error, but is potentially slower than would be
ideal.
But we are still getting the same error on Beam 2.24.
I added a way to deterministically generate a UUID which we do on the schema we
use in `.withValueDeserializerAndCoder([Deserializer<Row>],
SchemaCoder.of(outputSchema))`.
When we do not set the Schema UUID it fails with the same error every time so
it is difficult to log the SchemaCoder.
When we do set the schema UUID deterministically, it fails sometimes and other
times succeeds. When it does succeed, logging the coder, the fields are the
same in the same order with the same UUID on the schema. When I launch a new
job, the logging prints the same coder as in iteration 1, stumping me on why
the update failed at all. I need some advice here.
Follow up discussion:
I am curious if it has been determined what makes a Schema the same as another
schema. From what I have seen in the codebase, it changes.
In this PR: https://github.com/apache/beam/pull/13049
<https://github.com/apache/beam/pull/13049> I have generated a Schema UUID
based on the the field names, typeNames, and nullability. Let me know if this
is the right direction.
I think this UUID is worth making deterministic even if it doesn’t solve the
Dataflow update issues, so that comparison is faster.
There was a previous draft PR but I got busy and didn’t finish:
https://github.com/apache/beam/pull/11447#issuecomment-615442205, which had the
reply:
```
I would like to talk about the use case a bit more. I think this is probably to
do with update support on the Dataflow runner, in which case this may not be
the right solution (and may not be a sufficient change).
```
Follow up questions that may have overlap:
This is the only issue related to Dataflow updates with the Row type I could
see, but are there more internally, and if so, can they be made public so that
we can work on them?
I know Row type is “Experimental” but I have only seen discussions that
DATETIME should be removed before removing that tag. What other issues and
blockers are required for this to happen, if there are any?
Is this the totally wrong approach/wrong place for this discussion?
Should SchemaOptions and field descriptions be included to create a UUID? I
don’t think they should, so that descriptions and options can be updated (just
my naive opinion, to be changed).
Perhaps there are greater plans for schemas. Can this (or something similar)
merge as a stop gap?
This is my first message to the mailing list, so I apologize if any info is
missing.
Thanks,
Cameron