[ 
https://issues.apache.org/jira/browse/FLINK-36650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jerome Gagnon updated FLINK-36650:
----------------------------------
    Description: 
When using the `ConfluentRegistryAvroSerializationSchema{*}`{*} as the 
serialization schema for a Kafka Sink it looks like the `writeSchema` function 
is always called on `serialize` : 
[https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L120]

which then call `register` on schema registry : 
[https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85]

This is a problem for pre-registered type (ie; for generated classes in 
library) in which case we do not want the schema to be changed by application 
and that the schema validity is guaranteed by the provided classes.

>From a permission standpoint we want to restrict the schema registry API keys 
>to "read-only" permissions to avoid schema changes on write. Since the 
>`register` method is always called it still fails with the following event if 
>the Write operation ends up being a no-op.

 

 
{code:java}
Caused by: 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
User is denied operation Write on Subject: [REDACTED]; error code: 40301
 
{code}
Normally we would set `auto.register.schemas` to `false` on the producer 
configuration but it seems like this property is not being respected in the 
format. The fix to this bug would involve using this configuration and instead 
of calling `register` it would read the `schema` to provide the registerId or 
use the one in `SpecificRecordBase` for `AvroGenerated`

  was:
When using the `ConfluentRegistryAvroSerializationSchema{*}`{*} as the 
serialization schema for a Kafka Sink it looks like the `writeSchema` function 
is always called on `serialize` : 
[https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L120]

which then call `register` on schema registry : 
[https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85]

This is a problem for pre-registered type (ie; for generated classes in 
library) in which case we do not want the schema to be changed by application 
and that the schema validity is guaranteed by the provided classes.

>From a permission standpoint we want to restrict the schema registry API keys 
>to "read-only" permissions to avoid schema changes on write. Since the 
>`register` method is always called it still fails with the following event if 
>the Write operation ends up being a no-op.

```

Caused by: 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
User is denied operation Write on Subject: [REDACTED]; error code: 40301

```


> ConfluentRegistryAvroSerializationSchema always tries to register the schema 
> on serialize even if it already exists
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-36650
>                 URL: https://issues.apache.org/jira/browse/FLINK-36650
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.19.1
>            Reporter: Jerome Gagnon
>            Priority: Minor
>
> When using the `ConfluentRegistryAvroSerializationSchema{*}`{*} as the 
> serialization schema for a Kafka Sink it looks like the `writeSchema` 
> function is always called on `serialize` : 
> [https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L120]
> which then call `register` on schema registry : 
> [https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85]
> This is a problem for pre-registered type (ie; for generated classes in 
> library) in which case we do not want the schema to be changed by application 
> and that the schema validity is guaranteed by the provided classes.
> From a permission standpoint we want to restrict the schema registry API keys 
> to "read-only" permissions to avoid schema changes on write. Since the 
> `register` method is always called it still fails with the following event if 
> the Write operation ends up being a no-op.
>  
>  
> {code:java}
> Caused by: 
> io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
> User is denied operation Write on Subject: [REDACTED]; error code: 40301
>  
> {code}
> Normally we would set `auto.register.schemas` to `false` on the producer 
> configuration but it seems like this property is not being respected in the 
> format. The fix to this bug would involve using this configuration and 
> instead of calling `register` it would read the `schema` to provide the 
> registerId or use the one in `SpecificRecordBase` for `AvroGenerated`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to