[
https://issues.apache.org/jira/browse/FLINK-36650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jerome Gagnon updated FLINK-36650:
----------------------------------
Description:
When using the `ConfluentRegistryAvroSerializationSchema{*}`{*} as the
serialization schema for a Kafka Sink it looks like the `writeSchema` function
is always called on `serialize` :
[https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L120]
which then call `register` on schema registry :
[https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85]
This is a problem for pre-registered type (ie; for generated classes in
library) in which case we do not want the schema to be changed by application
and that the schema validity is guaranteed by the provided classes.
>From a permission standpoint we want to restrict the schema registry API keys
>to "read-only" permissions to avoid schema changes on write. Since the
>`register` method is always called it still fails with the following event if
>the Write operation ends up being a no-op.
{code:java}
Caused by:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
User is denied operation Write on Subject: [REDACTED]; error code: 40301
{code}
Normally we would set `auto.register.schemas` to `false` on the producer
configuration but it seems like this property is not being respected in the
format. The fix to this bug would involve using this configuration and instead
of calling `register` it would read the `schema` to provide the registerId or
use the one in `SpecificRecordBase` for `AvroGenerated`
was:
When using the `ConfluentRegistryAvroSerializationSchema{*}`{*} as the
serialization schema for a Kafka Sink it looks like the `writeSchema` function
is always called on `serialize` :
[https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L120]
which then call `register` on schema registry :
[https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85]
This is a problem for pre-registered type (ie; for generated classes in
library) in which case we do not want the schema to be changed by application
and that the schema validity is guaranteed by the provided classes.
>From a permission standpoint we want to restrict the schema registry API keys
>to "read-only" permissions to avoid schema changes on write. Since the
>`register` method is always called it still fails with the following event if
>the Write operation ends up being a no-op.
```
Caused by:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
User is denied operation Write on Subject: [REDACTED]; error code: 40301
```
> ConfluentRegistryAvroSerializationSchema always tries to register the schema
> on serialize even if it already exists
> -------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-36650
> URL: https://issues.apache.org/jira/browse/FLINK-36650
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.19.1
> Reporter: Jerome Gagnon
> Priority: Minor
>
> When using the `ConfluentRegistryAvroSerializationSchema{*}`{*} as the
> serialization schema for a Kafka Sink it looks like the `writeSchema`
> function is always called on `serialize` :
> [https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L120]
> which then call `register` on schema registry :
> [https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85]
> This is a problem for pre-registered type (ie; for generated classes in
> library) in which case we do not want the schema to be changed by application
> and that the schema validity is guaranteed by the provided classes.
> From a permission standpoint we want to restrict the schema registry API keys
> to "read-only" permissions to avoid schema changes on write. Since the
> `register` method is always called it still fails with the following event if
> the Write operation ends up being a no-op.
>
>
> {code:java}
> Caused by:
> io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
> User is denied operation Write on Subject: [REDACTED]; error code: 40301
>
> {code}
> Normally we would set `auto.register.schemas` to `false` on the producer
> configuration but it seems like this property is not being respected in the
> format. The fix to this bug would involve using this configuration and
> instead of calling `register` it would read the `schema` to provide the
> registerId or use the one in `SpecificRecordBase` for `AvroGenerated`
--
This message was sent by Atlassian Jira
(v8.20.10#820010)