carlpayne commented on issue #23291:
URL: https://github.com/apache/beam/issues/23291#issuecomment-1323644349

   Thanks @reuvenlax, we're using a cached version of our schema registry, so 
it will only be calling out externally for schemas it hasn't encountered 
previously.
   
   In terms of how we've seen this error happen in the wild, the workflow is 
roughly:
   
   1. User creates a new schema in our schema registry. Let's say that 
initially this contains our two fields above (`required_field` - REQUIRED and 
`optional_field` - NULLABLE).
   2. User starts sending Kafka events (in Avro) using this new schema.
   3. BigQueryIO sees the events, then converts from Avro to TableRow using 
`withFormatFunction`. It pulls the schema from our schema registry via 
`getSchema` in `DynamicDestinations`, then creates the new table in BigQuery 
(because it doesn't exist and we are using 
`BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED`).
   4. As new data comes in, BigQueryIO successfully writes the new rows.
   5. User makes a change to the schema and submits a new version to the schema 
registry. In this case, `required_field` was removed, which in Avro terms is 
valid, so long as there is a default value set.
   6. User starts sending Kafka events (in Avro) using the updated schema.
   7. BigQueryIO sees the event, then pulls the updated schema via `getSchema`, 
but then hits the `InvalidArgumentException` issue because BigQuery still 
expects `required_field` to be there, hence we end up in a retry loop.
   
   In the above case, the ideal outcome for us is that we could set 
`BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_RELAXATION` and have 
BigQueryIO automatically downgrade the column to NULLABLE then retry (as per 
https://github.com/apache/beam/issues/24063), but in the absence of supporting 
this it would be fantastic if we could get the failures back immediately, 
because this would allow us to do a full join between the raw Kafka input 
PCollection and the `PCollection<BigQueryStorageApiInsertError>` from 
`getFailedStorageApiInserts` (currently this isn't really feasible due to the 
large time window we'd have to buffer errors). This would then allow us to 
write the failed inserts back to our Kafka DLQ (as per discussion on 
https://github.com/apache/beam/issues/24090)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to