carlpayne commented on issue #23291: URL: https://github.com/apache/beam/issues/23291#issuecomment-1323644349
Thanks @reuvenlax, we're using a cached version of our schema registry, so it will only be calling out externally for schemas it hasn't encountered previously. In terms of how we've seen this error happen in the wild, the workflow is roughly: 1. User creates a new schema in our schema registry. Let's say that initially this contains our two fields above (`required_field` - REQUIRED and `optional_field` - NULLABLE). 2. User starts sending Kafka events (in Avro) using this new schema. 3. BigQueryIO sees the events, then converts from Avro to TableRow using `withFormatFunction`. It pulls the schema from our schema registry via `getSchema` in `DynamicDestinations`, then creates the new table in BigQuery (because it doesn't exist and we are using `BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED`). 4. As new data comes in, BigQueryIO successfully writes the new rows. 5. User makes a change to the schema and submits a new version to the schema registry. In this case, `required_field` was removed, which in Avro terms is valid, so long as there is a default value set. 6. User starts sending Kafka events (in Avro) using the updated schema. 7. BigQueryIO sees the event, then pulls the updated schema via `getSchema`, but then hits the `InvalidArgumentException` issue because BigQuery still expects `required_field` to be there, hence we end up in a retry loop. In the above case, the ideal outcome for us is that we could set `BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_RELAXATION` and have BigQueryIO automatically downgrade the column to NULLABLE then retry (as per https://github.com/apache/beam/issues/24063), but in the absence of supporting this it would be fantastic if we could get the failures back immediately, because this would allow us to do a full join between the raw Kafka input PCollection and the `PCollection<BigQueryStorageApiInsertError>` from `getFailedStorageApiInserts` (currently this isn't really feasible due to the large time window we'd have to buffer errors). This would then allow us to write the failed inserts back to our Kafka DLQ (as per discussion on https://github.com/apache/beam/issues/24090) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
